This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 314b418efea8f35d39b05abef5361289b054b6a7
Author: Jim Hughes <jhug...@confluent.io>
AuthorDate: Thu Dec 7 08:12:42 2023 -0500

    [FLINK-33757] Implement restore tests for Rank node
---
 .../plan/nodes/exec/stream/RankRestoreTest.java    |  42 +++
 .../plan/nodes/exec/stream/RankTestPrograms.java   | 187 +++++++++++++
 .../rank-n-test/plan/rank-n-test.json              | 301 +++++++++++++++++++++
 .../rank-n-test/savepoint/_metadata                | Bin 0 -> 12161 bytes
 .../plan/rank-test-append-fast-strategy.json}      | 111 +++-----
 .../savepoint/_metadata                            | Bin 0 -> 13768 bytes
 .../plan/rank-test-retract-strategy.json}          | 133 ++++-----
 .../rank-test-retract-strategy/savepoint/_metadata | Bin 0 -> 18147 bytes
 .../plan/rank-test-update-fast-strategy.json}      | 186 +++++++------
 .../savepoint/_metadata                            | Bin 0 -> 22387 bytes
 10 files changed, 728 insertions(+), 232 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/RankRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/RankRestoreTest.java
new file mode 100644
index 00000000000..cb76c1ca723
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/RankRestoreTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
+import org.apache.flink.table.test.program.TableTestProgram;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** Restore tests for {@link StreamExecRank}. */
+public class RankRestoreTest extends RestoreTestBase {
+
+    public RankRestoreTest() {
+        super(StreamExecRank.class);
+    }
+
+    @Override
+    public List<TableTestProgram> programs() {
+        return Arrays.asList(
+                RankTestPrograms.RANK_TEST_APPEND_FAST_STRATEGY,
+                RankTestPrograms.RANK_TEST_RETRACT_STRATEGY,
+                RankTestPrograms.RANK_TEST_UPDATE_FAST_STRATEGY,
+                RankTestPrograms.RANK_N_TEST);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/RankTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/RankTestPrograms.java
new file mode 100644
index 00000000000..979e2adbb52
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/RankTestPrograms.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+
+import static 
org.apache.flink.table.factories.TestFormatFactory.CHANGELOG_MODE;
+
+/** {@link TableTestProgram} definitions for testing {@link StreamExecRank}. */
+public class RankTestPrograms {
+
+    static final TableTestProgram RANK_TEST_APPEND_FAST_STRATEGY =
+            getTableTestProgram(
+                    "rank-test-append-fast-strategy",
+                    "I",
+                    new String[] {
+                        "+I[2, a, 1]",
+                        "+I[4, b, 1]",
+                        "+I[6, c, 1]",
+                        "-U[2, a, 1]",
+                        "+U[1, a, 1]",
+                        "-U[4, b, 1]",
+                        "+U[3, b, 1]",
+                        "-U[6, c, 1]",
+                        "+U[5, c, 1]"
+                    },
+                    new String[] {"+I[4, d, 1]", "+I[3, e, 1]"});
+
+    static final TableTestProgram RANK_TEST_RETRACT_STRATEGY =
+            getTableTestProgram(
+                    "rank-test-retract-strategy",
+                    "I,UA,UB",
+                    new String[] {
+                        "+I[2, a, 1]",
+                        "+I[4, b, 1]",
+                        "+I[6, c, 1]",
+                        "-D[2, a, 1]",
+                        "+I[1, a, 1]",
+                        "-D[4, b, 1]",
+                        "+I[3, b, 1]",
+                        "-D[6, c, 1]",
+                        "+I[5, c, 1]"
+                    },
+                    new String[] {"+I[4, d, 1]", "+I[3, e, 1]"});
+
+    static final TableTestProgram RANK_TEST_UPDATE_FAST_STRATEGY =
+            TableTestProgram.of("rank-test-update-fast-strategy", "validates 
rank exec node")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("MyTable")
+                                    .addSchema(
+                                            "a INT primary key not enforced", 
"b VARCHAR", "c INT")
+                                    .addOption(CHANGELOG_MODE, "I")
+                                    .producedBeforeRestore(
+                                            Row.of(2, "a", 6),
+                                            Row.of(4, "b", 8),
+                                            Row.of(6, "c", 10),
+                                            Row.of(1, "a", 5),
+                                            Row.of(3, "b", 7),
+                                            Row.of(5, "c", 9))
+                                    .producedAfterRestore(Row.of(4, "d", 7), 
Row.of(0, "a", 8))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(
+                                            "a INT NOT NULL",
+                                            "b STRING",
+                                            "count_c BIGINT NOT NULL",
+                                            "row_num BIGINT NOT NULL")
+                                    .consumedBeforeRestore(
+                                            "+I[2, a, 1, 1]",
+                                            "+I[4, b, 1, 1]",
+                                            "+I[6, c, 1, 1]",
+                                            "-U[2, a, 1, 1]",
+                                            "+U[1, a, 1, 1]",
+                                            "+I[2, a, 1, 2]",
+                                            "-U[4, b, 1, 1]",
+                                            "+U[3, b, 1, 1]",
+                                            "+I[4, b, 1, 2]",
+                                            "-U[6, c, 1, 1]",
+                                            "+U[5, c, 1, 1]",
+                                            "+I[6, c, 1, 2]")
+                                    .consumedAfterRestore(
+                                            new String[] {
+                                                "+I[4, d, 1, 1]",
+                                                "-U[1, a, 1, 1]",
+                                                "+U[0, a, 1, 1]",
+                                                "-U[2, a, 1, 2]",
+                                                "+U[1, a, 1, 2]",
+                                                "+I[2, a, 1, 3]"
+                                            })
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t SELECT * FROM ("
+                                    + "SELECT a, b, count_c, ROW_NUMBER() "
+                                    + "   OVER (PARTITION BY b ORDER BY 
count_c DESC, a ASC) AS row_num"
+                                    + "   FROM ("
+                                    + "       SELECT a, b, COUNT(*) AS count_c"
+                                    + "       FROM MyTable"
+                                    + "       GROUP BY a, b"
+                                    + "   )"
+                                    + ") WHERE row_num <= 10")
+                    .build();
+
+    private static TableTestProgram getTableTestProgram(
+            final String name,
+            final String changelogMode,
+            final String[] resultsBeforeRestore,
+            final String[] resultsAfterRestore) {
+        return TableTestProgram.of(name, "validates rank exec node")
+                .setupTableSource(
+                        SourceTestStep.newBuilder("MyTable")
+                                .addSchema("a INT", "b VARCHAR", "c INT 
primary key not enforced")
+                                .addOption(CHANGELOG_MODE, changelogMode)
+                                .producedBeforeRestore(
+                                        Row.of(2, "a", 6),
+                                        Row.of(4, "b", 8),
+                                        Row.of(6, "c", 10),
+                                        Row.of(1, "a", 5),
+                                        Row.of(3, "b", 7),
+                                        Row.of(5, "c", 9))
+                                .producedAfterRestore(Row.of(4, "d", 7), 
Row.of(3, "e", 8))
+                                .build())
+                .setupTableSink(
+                        SinkTestStep.newBuilder("sink_t")
+                                .addSchema("a INT", "b VARCHAR", "c BIGINT")
+                                .consumedBeforeRestore(resultsBeforeRestore)
+                                .consumedAfterRestore(resultsAfterRestore)
+                                .build())
+                .runSql(
+                        "insert into `sink_t` select * from "
+                                + "(select a, b, row_number() over(partition 
by b order by c) as c from MyTable)"
+                                + " where c = 1")
+                .build();
+    }
+
+    static final TableTestProgram RANK_N_TEST =
+            TableTestProgram.of("rank-n-test", "validates rank node can handle 
multiple outputs")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("MyTable1")
+                                    .addSchema("a STRING", "b INT", "c INT", 
"t as proctime()")
+                                    .producedBeforeRestore(
+                                            Row.of("book", 1, 12),
+                                            Row.of("book", 2, 19),
+                                            Row.of("book", 4, 11),
+                                            Row.of("fruit", 4, 33))
+                                    .producedAfterRestore(
+                                            Row.of("cereal", 6, 21),
+                                            Row.of("cereal", 7, 23),
+                                            Row.of("apple", 8, 31),
+                                            Row.of("fruit", 9, 41))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("result1")
+                                    .addSchema("a varchar", "b int", "c 
bigint")
+                                    .consumedBeforeRestore(
+                                            "+I[book, 1, 1]", "+I[book, 2, 
2]", "+I[fruit, 4, 1]")
+                                    .consumedAfterRestore(
+                                            "+I[cereal, 6, 1]",
+                                            "+I[cereal, 7, 2]",
+                                            "+I[apple, 8, 1]",
+                                            "+I[fruit, 9, 2]")
+                                    .build())
+                    .runSql(
+                            "insert into `result1` select * from "
+                                    + "(select a, b, row_number() 
over(partition by a order by t asc) as c from MyTable1)"
+                                    + " where c <= 2")
+                    .build();
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-n-test/plan/rank-n-test.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-n-test/plan/rank-n-test.json
new file mode 100644
index 00000000000..11f4b21e9e3
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-n-test/plan/rank-n-test.json
@@ -0,0 +1,301 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 17,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MyTable1`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "b",
+              "dataType" : "INT"
+            }, {
+              "name" : "c",
+              "dataType" : "INT"
+            }, {
+              "name" : "t",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$PROCTIME$1",
+                  "operands" : [ ],
+                  "type" : {
+                    "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+                    "nullable" : false,
+                    "precision" : 3,
+                    "kind" : "PROCTIME"
+                  }
+                },
+                "serializableString" : "PROCTIME()"
+              }
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ], [ 1 ] ],
+        "producedType" : "ROW<`a` VARCHAR(2147483647), `b` INT> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`a` VARCHAR(2147483647), `b` INT> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`a` VARCHAR(2147483647), `b` INT>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable1, project=[a, b], metadata=[]]], fields=[a, b])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 18,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "INT"
+    }, {
+      "kind" : "CALL",
+      "internalName" : "$PROCTIME$1",
+      "operands" : [ ],
+      "type" : {
+        "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+        "nullable" : false,
+        "precision" : 3,
+        "kind" : "PROCTIME"
+      }
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "a",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "b",
+        "fieldType" : "INT"
+      }, {
+        "name" : "$2",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      } ]
+    },
+    "description" : "Calc(select=[a, b, PROCTIME() AS $2])"
+  }, {
+    "id" : 19,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "a",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "b",
+        "fieldType" : "INT"
+      }, {
+        "name" : "$2",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      } ]
+    },
+    "description" : "Exchange(distribution=[hash[a]])"
+  }, {
+    "id" : 20,
+    "type" : "stream-exec-rank_1",
+    "configuration" : {
+      "table.exec.rank.topn-cache-size" : "10000"
+    },
+    "rankType" : "ROW_NUMBER",
+    "partition" : {
+      "fields" : [ 0 ]
+    },
+    "orderBy" : {
+      "fields" : [ {
+        "index" : 2,
+        "isAscending" : true,
+        "nullIsLast" : false
+      } ]
+    },
+    "rankRange" : {
+      "type" : "Constant",
+      "start" : 1,
+      "end" : 2
+    },
+    "rankStrategy" : {
+      "type" : "AppendFast"
+    },
+    "outputRowNumber" : true,
+    "generateUpdateBefore" : true,
+    "state" : [ {
+      "index" : 0,
+      "ttl" : "0 ms",
+      "name" : "rankState"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "a",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "b",
+        "fieldType" : "INT"
+      }, {
+        "name" : "$2",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      }, {
+        "name" : "w0$o0",
+        "fieldType" : "BIGINT NOT NULL"
+      } ]
+    },
+    "description" : "Rank(strategy=[AppendFastStrategy], 
rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=2], partitionBy=[a], 
orderBy=[$2 ASC], select=[a, b, $2, w0$o0])"
+  }, {
+    "id" : 21,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "BIGINT NOT NULL"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` VARCHAR(2147483647), `b` INT, `w0$o0` BIGINT NOT 
NULL>",
+    "description" : "Calc(select=[a, b, w0$o0])"
+  }, {
+    "id" : 22,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`result1`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "b",
+              "dataType" : "INT"
+            }, {
+              "name" : "c",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", 
"DELETE" ],
+    "inputUpsertKey" : [ 0, 2 ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` VARCHAR(2147483647), `b` INT, `w0$o0` BIGINT NOT 
NULL>",
+    "description" : "Sink(table=[default_catalog.default_database.result1], 
fields=[a, b, w0$o0])"
+  } ],
+  "edges" : [ {
+    "source" : 17,
+    "target" : 18,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 18,
+    "target" : 19,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 19,
+    "target" : 20,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 20,
+    "target" : 21,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 21,
+    "target" : 22,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-n-test/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-n-test/savepoint/_metadata
new file mode 100644
index 00000000000..8c9ecaab640
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-n-test/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/RankJsonPlanTest_jsonplan/testRank.out
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-test-append-fast-strategy/plan/rank-test-append-fast-strategy.json
similarity index 65%
copy from 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/RankJsonPlanTest_jsonplan/testRank.out
copy to 
flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-test-append-fast-strategy/plan/rank-test-append-fast-strategy.json
index 7209ea0be4a..790da11a5b3 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/RankJsonPlanTest_jsonplan/testRank.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-test-append-fast-strategy/plan/rank-test-append-fast-strategy.json
@@ -1,5 +1,5 @@
 {
-  "flinkVersion" : "",
+  "flinkVersion" : "1.19",
   "nodes" : [ {
     "id" : 1,
     "type" : "stream-exec-table-source-scan_1",
@@ -10,54 +10,30 @@
           "schema" : {
             "columns" : [ {
               "name" : "a",
-              "dataType" : "BIGINT"
+              "dataType" : "INT"
             }, {
               "name" : "b",
-              "dataType" : "INT NOT NULL"
-            }, {
-              "name" : "c",
               "dataType" : "VARCHAR(2147483647)"
             }, {
-              "name" : "d",
-              "dataType" : "TIMESTAMP(3)"
+              "name" : "c",
+              "dataType" : "INT NOT NULL"
             } ],
-            "watermarkSpecs" : [ ]
+            "watermarkSpecs" : [ ],
+            "primaryKey" : {
+              "name" : "PK_c",
+              "type" : "PRIMARY_KEY",
+              "columns" : [ "c" ]
+            }
           },
-          "partitionKeys" : [ ],
-          "options" : {
-            "bounded" : "false",
-            "connector" : "values"
-          }
+          "partitionKeys" : [ ]
         }
       }
     },
-    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), 
`d` TIMESTAMP(3)>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, d])",
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT NOT NULL>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c])",
     "inputProperties" : [ ]
   }, {
     "id" : 2,
-    "type" : "stream-exec-calc_1",
-    "projection" : [ {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 0,
-      "type" : "BIGINT"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 1,
-      "type" : "INT NOT NULL"
-    } ],
-    "condition" : null,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>",
-    "description" : "Calc(select=[a, b])"
-  }, {
-    "id" : 3,
     "type" : "stream-exec-exchange_1",
     "inputProperties" : [ {
       "requiredDistribution" : {
@@ -67,10 +43,10 @@
       "damBehavior" : "PIPELINED",
       "priority" : 0
     } ],
-    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>",
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT NOT NULL>",
     "description" : "Exchange(distribution=[hash[b]])"
   }, {
-    "id" : 4,
+    "id" : 3,
     "type" : "stream-exec-rank_1",
     "configuration" : {
       "table.exec.rank.topn-cache-size" : "10000"
@@ -81,19 +57,20 @@
     },
     "orderBy" : {
       "fields" : [ {
-        "index" : 0,
+        "index" : 2,
         "isAscending" : true,
         "nullIsLast" : false
       } ]
     },
     "rankRange" : {
-      "type" : "Variable",
-      "endIndex" : 0
+      "type" : "Constant",
+      "start" : 1,
+      "end" : 1
     },
     "rankStrategy" : {
       "type" : "AppendFast"
     },
-    "outputRowNumber" : true,
+    "outputRowNumber" : false,
     "generateUpdateBefore" : true,
     "state" : [ {
       "index" : 0,
@@ -107,18 +84,22 @@
       "damBehavior" : "PIPELINED",
       "priority" : 0
     } ],
-    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `w0$o0` BIGINT NOT 
NULL>",
-    "description" : "Rank(strategy=[AppendFastStrategy], 
rankType=[ROW_NUMBER], rankRange=[rankEnd=a], partitionBy=[b], orderBy=[a ASC], 
select=[a, b, w0$o0])"
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT NOT NULL>",
+    "description" : "Rank(strategy=[AppendFastStrategy], 
rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[b], 
orderBy=[c ASC], select=[a, b, c])"
   }, {
-    "id" : 5,
+    "id" : 4,
     "type" : "stream-exec-calc_1",
     "projection" : [ {
       "kind" : "INPUT_REF",
       "inputIndex" : 0,
-      "type" : "BIGINT"
+      "type" : "INT"
     }, {
       "kind" : "INPUT_REF",
-      "inputIndex" : 2,
+      "inputIndex" : 1,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "LITERAL",
+      "value" : 1,
       "type" : "BIGINT NOT NULL"
     } ],
     "condition" : null,
@@ -129,10 +110,10 @@
       "damBehavior" : "PIPELINED",
       "priority" : 0
     } ],
-    "outputType" : "ROW<`a` BIGINT, `w0$o0` BIGINT NOT NULL>",
-    "description" : "Calc(select=[a, w0$o0])"
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `$2` BIGINT NOT 
NULL>",
+    "description" : "Calc(select=[a, b, 1 AS $2])"
   }, {
-    "id" : 6,
+    "id" : 5,
     "type" : "stream-exec-sink_1",
     "configuration" : {
       "table.exec.sink.keyed-shuffle" : "AUTO",
@@ -143,28 +124,27 @@
     },
     "dynamicTableSink" : {
       "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
         "resolvedTable" : {
           "schema" : {
             "columns" : [ {
               "name" : "a",
-              "dataType" : "BIGINT"
+              "dataType" : "INT"
             }, {
               "name" : "b",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "c",
               "dataType" : "BIGINT"
             } ],
             "watermarkSpecs" : [ ]
           },
-          "partitionKeys" : [ ],
-          "options" : {
-            "connector" : "values",
-            "sink-insert-only" : "false",
-            "table-sink-class" : "DEFAULT"
-          }
+          "partitionKeys" : [ ]
         }
       }
     },
     "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", 
"DELETE" ],
+    "inputUpsertKey" : [ 1 ],
     "inputProperties" : [ {
       "requiredDistribution" : {
         "type" : "UNKNOWN"
@@ -172,8 +152,8 @@
       "damBehavior" : "PIPELINED",
       "priority" : 0
     } ],
-    "outputType" : "ROW<`a` BIGINT, `w0$o0` BIGINT NOT NULL>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[a, w0$o0])"
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `$2` BIGINT NOT 
NULL>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, b, $2])"
   } ],
   "edges" : [ {
     "source" : 1,
@@ -203,12 +183,5 @@
       "type" : "FORWARD"
     },
     "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 5,
-    "target" : 6,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
   } ]
-}
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-test-append-fast-strategy/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-test-append-fast-strategy/savepoint/_metadata
new file mode 100644
index 00000000000..9d5f2be6c14
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-test-append-fast-strategy/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/RankJsonPlanTest_jsonplan/testRank.out
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-test-retract-strategy/plan/rank-test-retract-strategy.json
similarity index 59%
copy from 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/RankJsonPlanTest_jsonplan/testRank.out
copy to 
flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-test-retract-strategy/plan/rank-test-retract-strategy.json
index 7209ea0be4a..e79e6fc6cbc 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/RankJsonPlanTest_jsonplan/testRank.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-test-retract-strategy/plan/rank-test-retract-strategy.json
@@ -1,7 +1,7 @@
 {
-  "flinkVersion" : "",
+  "flinkVersion" : "1.19",
   "nodes" : [ {
-    "id" : 1,
+    "id" : 6,
     "type" : "stream-exec-table-source-scan_1",
     "scanTableSource" : {
       "table" : {
@@ -10,54 +10,30 @@
           "schema" : {
             "columns" : [ {
               "name" : "a",
-              "dataType" : "BIGINT"
+              "dataType" : "INT"
             }, {
               "name" : "b",
-              "dataType" : "INT NOT NULL"
-            }, {
-              "name" : "c",
               "dataType" : "VARCHAR(2147483647)"
             }, {
-              "name" : "d",
-              "dataType" : "TIMESTAMP(3)"
+              "name" : "c",
+              "dataType" : "INT NOT NULL"
             } ],
-            "watermarkSpecs" : [ ]
+            "watermarkSpecs" : [ ],
+            "primaryKey" : {
+              "name" : "PK_c",
+              "type" : "PRIMARY_KEY",
+              "columns" : [ "c" ]
+            }
           },
-          "partitionKeys" : [ ],
-          "options" : {
-            "bounded" : "false",
-            "connector" : "values"
-          }
+          "partitionKeys" : [ ]
         }
       }
     },
-    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), 
`d` TIMESTAMP(3)>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, d])",
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT NOT NULL>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c])",
     "inputProperties" : [ ]
   }, {
-    "id" : 2,
-    "type" : "stream-exec-calc_1",
-    "projection" : [ {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 0,
-      "type" : "BIGINT"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 1,
-      "type" : "INT NOT NULL"
-    } ],
-    "condition" : null,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>",
-    "description" : "Calc(select=[a, b])"
-  }, {
-    "id" : 3,
+    "id" : 7,
     "type" : "stream-exec-exchange_1",
     "inputProperties" : [ {
       "requiredDistribution" : {
@@ -67,10 +43,10 @@
       "damBehavior" : "PIPELINED",
       "priority" : 0
     } ],
-    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>",
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT NOT NULL>",
     "description" : "Exchange(distribution=[hash[b]])"
   }, {
-    "id" : 4,
+    "id" : 8,
     "type" : "stream-exec-rank_1",
     "configuration" : {
       "table.exec.rank.topn-cache-size" : "10000"
@@ -81,19 +57,20 @@
     },
     "orderBy" : {
       "fields" : [ {
-        "index" : 0,
+        "index" : 2,
         "isAscending" : true,
         "nullIsLast" : false
       } ]
     },
     "rankRange" : {
-      "type" : "Variable",
-      "endIndex" : 0
+      "type" : "Constant",
+      "start" : 1,
+      "end" : 1
     },
     "rankStrategy" : {
-      "type" : "AppendFast"
+      "type" : "Retract"
     },
-    "outputRowNumber" : true,
+    "outputRowNumber" : false,
     "generateUpdateBefore" : true,
     "state" : [ {
       "index" : 0,
@@ -107,18 +84,22 @@
       "damBehavior" : "PIPELINED",
       "priority" : 0
     } ],
-    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `w0$o0` BIGINT NOT 
NULL>",
-    "description" : "Rank(strategy=[AppendFastStrategy], 
rankType=[ROW_NUMBER], rankRange=[rankEnd=a], partitionBy=[b], orderBy=[a ASC], 
select=[a, b, w0$o0])"
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT NOT NULL>",
+    "description" : "Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1, rankEnd=1], partitionBy=[b], orderBy=[c ASC], 
select=[a, b, c])"
   }, {
-    "id" : 5,
+    "id" : 9,
     "type" : "stream-exec-calc_1",
     "projection" : [ {
       "kind" : "INPUT_REF",
       "inputIndex" : 0,
-      "type" : "BIGINT"
+      "type" : "INT"
     }, {
       "kind" : "INPUT_REF",
-      "inputIndex" : 2,
+      "inputIndex" : 1,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "LITERAL",
+      "value" : 1,
       "type" : "BIGINT NOT NULL"
     } ],
     "condition" : null,
@@ -129,10 +110,10 @@
       "damBehavior" : "PIPELINED",
       "priority" : 0
     } ],
-    "outputType" : "ROW<`a` BIGINT, `w0$o0` BIGINT NOT NULL>",
-    "description" : "Calc(select=[a, w0$o0])"
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `$2` BIGINT NOT 
NULL>",
+    "description" : "Calc(select=[a, b, 1 AS $2])"
   }, {
-    "id" : 6,
+    "id" : 10,
     "type" : "stream-exec-sink_1",
     "configuration" : {
       "table.exec.sink.keyed-shuffle" : "AUTO",
@@ -143,28 +124,27 @@
     },
     "dynamicTableSink" : {
       "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
         "resolvedTable" : {
           "schema" : {
             "columns" : [ {
               "name" : "a",
-              "dataType" : "BIGINT"
+              "dataType" : "INT"
             }, {
               "name" : "b",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "c",
               "dataType" : "BIGINT"
             } ],
             "watermarkSpecs" : [ ]
           },
-          "partitionKeys" : [ ],
-          "options" : {
-            "connector" : "values",
-            "sink-insert-only" : "false",
-            "table-sink-class" : "DEFAULT"
-          }
+          "partitionKeys" : [ ]
         }
       }
     },
     "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", 
"DELETE" ],
+    "inputUpsertKey" : [ 1 ],
     "inputProperties" : [ {
       "requiredDistribution" : {
         "type" : "UNKNOWN"
@@ -172,43 +152,36 @@
       "damBehavior" : "PIPELINED",
       "priority" : 0
     } ],
-    "outputType" : "ROW<`a` BIGINT, `w0$o0` BIGINT NOT NULL>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[a, w0$o0])"
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `$2` BIGINT NOT 
NULL>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, b, $2])"
   } ],
   "edges" : [ {
-    "source" : 1,
-    "target" : 2,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 2,
-    "target" : 3,
+    "source" : 6,
+    "target" : 7,
     "shuffle" : {
       "type" : "FORWARD"
     },
     "shuffleMode" : "PIPELINED"
   }, {
-    "source" : 3,
-    "target" : 4,
+    "source" : 7,
+    "target" : 8,
     "shuffle" : {
       "type" : "FORWARD"
     },
     "shuffleMode" : "PIPELINED"
   }, {
-    "source" : 4,
-    "target" : 5,
+    "source" : 8,
+    "target" : 9,
     "shuffle" : {
       "type" : "FORWARD"
     },
     "shuffleMode" : "PIPELINED"
   }, {
-    "source" : 5,
-    "target" : 6,
+    "source" : 9,
+    "target" : 10,
     "shuffle" : {
       "type" : "FORWARD"
     },
     "shuffleMode" : "PIPELINED"
   } ]
-}
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-test-retract-strategy/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-test-retract-strategy/savepoint/_metadata
new file mode 100644
index 00000000000..4e542ae3a77
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-test-retract-strategy/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/RankJsonPlanTest_jsonplan/testRank.out
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-test-update-fast-strategy/plan/rank-test-update-fast-strategy.json
similarity index 50%
rename from 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/RankJsonPlanTest_jsonplan/testRank.out
rename to 
flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-test-update-fast-strategy/plan/rank-test-update-fast-strategy.json
index 7209ea0be4a..ce4974b9939 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/RankJsonPlanTest_jsonplan/testRank.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-test-update-fast-strategy/plan/rank-test-update-fast-strategy.json
@@ -1,7 +1,7 @@
 {
-  "flinkVersion" : "",
+  "flinkVersion" : "1.19",
   "nodes" : [ {
-    "id" : 1,
+    "id" : 11,
     "type" : "stream-exec-table-source-scan_1",
     "scanTableSource" : {
       "table" : {
@@ -10,43 +10,77 @@
           "schema" : {
             "columns" : [ {
               "name" : "a",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "b",
               "dataType" : "INT NOT NULL"
             }, {
-              "name" : "c",
+              "name" : "b",
               "dataType" : "VARCHAR(2147483647)"
             }, {
-              "name" : "d",
-              "dataType" : "TIMESTAMP(3)"
+              "name" : "c",
+              "dataType" : "INT"
             } ],
-            "watermarkSpecs" : [ ]
+            "watermarkSpecs" : [ ],
+            "primaryKey" : {
+              "name" : "PK_a",
+              "type" : "PRIMARY_KEY",
+              "columns" : [ "a" ]
+            }
           },
-          "partitionKeys" : [ ],
-          "options" : {
-            "bounded" : "false",
-            "connector" : "values"
-          }
+          "partitionKeys" : [ ]
         }
-      }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ], [ 1 ] ],
+        "producedType" : "ROW<`a` INT NOT NULL, `b` VARCHAR(2147483647)> NOT 
NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`a` INT NOT NULL, `b` VARCHAR(2147483647)> NOT 
NULL"
+      } ]
     },
-    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), 
`d` TIMESTAMP(3)>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, d])",
+    "outputType" : "ROW<`a` INT NOT NULL, `b` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable, project=[a, b], metadata=[]]], fields=[a, b])",
     "inputProperties" : [ ]
   }, {
-    "id" : 2,
-    "type" : "stream-exec-calc_1",
-    "projection" : [ {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 0,
-      "type" : "BIGINT"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 1,
-      "type" : "INT NOT NULL"
+    "id" : 12,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0, 1 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT NOT NULL, `b` VARCHAR(2147483647)>",
+    "description" : "Exchange(distribution=[hash[a, b]])"
+  }, {
+    "id" : 13,
+    "type" : "stream-exec-group-aggregate_1",
+    "configuration" : {
+      "table.exec.mini-batch.enabled" : "false",
+      "table.exec.mini-batch.size" : "-1"
+    },
+    "grouping" : [ 0, 1 ],
+    "aggCalls" : [ {
+      "name" : "count_c",
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
+      "argList" : [ ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT NOT NULL"
+    } ],
+    "aggCallNeedRetractions" : [ false ],
+    "generateUpdateBefore" : false,
+    "needRetraction" : false,
+    "state" : [ {
+      "index" : 0,
+      "ttl" : "0 ms",
+      "name" : "groupAggregateState"
     } ],
-    "condition" : null,
     "inputProperties" : [ {
       "requiredDistribution" : {
         "type" : "UNKNOWN"
@@ -54,10 +88,10 @@
       "damBehavior" : "PIPELINED",
       "priority" : 0
     } ],
-    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>",
-    "description" : "Calc(select=[a, b])"
+    "outputType" : "ROW<`a` INT NOT NULL, `b` VARCHAR(2147483647), `count_c` 
BIGINT NOT NULL>",
+    "description" : "GroupAggregate(groupBy=[a, b], select=[a, b, COUNT(*) AS 
count_c])"
   }, {
-    "id" : 3,
+    "id" : 14,
     "type" : "stream-exec-exchange_1",
     "inputProperties" : [ {
       "requiredDistribution" : {
@@ -67,10 +101,10 @@
       "damBehavior" : "PIPELINED",
       "priority" : 0
     } ],
-    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>",
+    "outputType" : "ROW<`a` INT NOT NULL, `b` VARCHAR(2147483647), `count_c` 
BIGINT NOT NULL>",
     "description" : "Exchange(distribution=[hash[b]])"
   }, {
-    "id" : 4,
+    "id" : 15,
     "type" : "stream-exec-rank_1",
     "configuration" : {
       "table.exec.rank.topn-cache-size" : "10000"
@@ -81,17 +115,23 @@
     },
     "orderBy" : {
       "fields" : [ {
+        "index" : 2,
+        "isAscending" : false,
+        "nullIsLast" : true
+      }, {
         "index" : 0,
         "isAscending" : true,
         "nullIsLast" : false
       } ]
     },
     "rankRange" : {
-      "type" : "Variable",
-      "endIndex" : 0
+      "type" : "Constant",
+      "start" : 1,
+      "end" : 10
     },
     "rankStrategy" : {
-      "type" : "AppendFast"
+      "type" : "UpdateFast",
+      "primaryKeys" : [ 0, 1 ]
     },
     "outputRowNumber" : true,
     "generateUpdateBefore" : true,
@@ -107,32 +147,10 @@
       "damBehavior" : "PIPELINED",
       "priority" : 0
     } ],
-    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `w0$o0` BIGINT NOT 
NULL>",
-    "description" : "Rank(strategy=[AppendFastStrategy], 
rankType=[ROW_NUMBER], rankRange=[rankEnd=a], partitionBy=[b], orderBy=[a ASC], 
select=[a, b, w0$o0])"
+    "outputType" : "ROW<`a` INT NOT NULL, `b` VARCHAR(2147483647), `count_c` 
BIGINT NOT NULL, `w0$o0` BIGINT NOT NULL>",
+    "description" : "Rank(strategy=[UpdateFastStrategy[0,1]], 
rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[b], 
orderBy=[count_c DESC, a ASC], select=[a, b, count_c, w0$o0])"
   }, {
-    "id" : 5,
-    "type" : "stream-exec-calc_1",
-    "projection" : [ {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 0,
-      "type" : "BIGINT"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 2,
-      "type" : "BIGINT NOT NULL"
-    } ],
-    "condition" : null,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a` BIGINT, `w0$o0` BIGINT NOT NULL>",
-    "description" : "Calc(select=[a, w0$o0])"
-  }, {
-    "id" : 6,
+    "id" : 16,
     "type" : "stream-exec-sink_1",
     "configuration" : {
       "table.exec.sink.keyed-shuffle" : "AUTO",
@@ -143,28 +161,30 @@
     },
     "dynamicTableSink" : {
       "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
         "resolvedTable" : {
           "schema" : {
             "columns" : [ {
               "name" : "a",
-              "dataType" : "BIGINT"
+              "dataType" : "INT NOT NULL"
             }, {
               "name" : "b",
-              "dataType" : "BIGINT"
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "count_c",
+              "dataType" : "BIGINT NOT NULL"
+            }, {
+              "name" : "row_num",
+              "dataType" : "BIGINT NOT NULL"
             } ],
             "watermarkSpecs" : [ ]
           },
-          "partitionKeys" : [ ],
-          "options" : {
-            "connector" : "values",
-            "sink-insert-only" : "false",
-            "table-sink-class" : "DEFAULT"
-          }
+          "partitionKeys" : [ ]
         }
       }
     },
     "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", 
"DELETE" ],
+    "inputUpsertKey" : [ 0, 1 ],
     "inputProperties" : [ {
       "requiredDistribution" : {
         "type" : "UNKNOWN"
@@ -172,43 +192,43 @@
       "damBehavior" : "PIPELINED",
       "priority" : 0
     } ],
-    "outputType" : "ROW<`a` BIGINT, `w0$o0` BIGINT NOT NULL>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[a, w0$o0])"
+    "outputType" : "ROW<`a` INT NOT NULL, `b` VARCHAR(2147483647), `count_c` 
BIGINT NOT NULL, `w0$o0` BIGINT NOT NULL>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, b, count_c, w0$o0])"
   } ],
   "edges" : [ {
-    "source" : 1,
-    "target" : 2,
+    "source" : 11,
+    "target" : 12,
     "shuffle" : {
       "type" : "FORWARD"
     },
     "shuffleMode" : "PIPELINED"
   }, {
-    "source" : 2,
-    "target" : 3,
+    "source" : 12,
+    "target" : 13,
     "shuffle" : {
       "type" : "FORWARD"
     },
     "shuffleMode" : "PIPELINED"
   }, {
-    "source" : 3,
-    "target" : 4,
+    "source" : 13,
+    "target" : 14,
     "shuffle" : {
       "type" : "FORWARD"
     },
     "shuffleMode" : "PIPELINED"
   }, {
-    "source" : 4,
-    "target" : 5,
+    "source" : 14,
+    "target" : 15,
     "shuffle" : {
       "type" : "FORWARD"
     },
     "shuffleMode" : "PIPELINED"
   }, {
-    "source" : 5,
-    "target" : 6,
+    "source" : 15,
+    "target" : 16,
     "shuffle" : {
       "type" : "FORWARD"
     },
     "shuffleMode" : "PIPELINED"
   } ]
-}
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-test-update-fast-strategy/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-test-update-fast-strategy/savepoint/_metadata
new file mode 100644
index 00000000000..b3d7adb0e5e
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-test-update-fast-strategy/savepoint/_metadata
 differ

Reply via email to