This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new aa5766e257a [FLINK-33861] Implement restore tests for WindowRank node aa5766e257a is described below commit aa5766e257a8b40e15d08eafa1e005837694772b Author: bvarghese1 <bvargh...@confluent.io> AuthorDate: Mon Dec 18 19:44:40 2023 -0800 [FLINK-33861] Implement restore tests for WindowRank node --- .../nodes/exec/stream/WindowRankRestoreTest.java | 44 ++ .../nodes/exec/stream/WindowRankTestPrograms.java | 266 ++++++++ .../plan/window-rank-cumulate-tvf-min-top-n.json | 687 +++++++++++++++++++++ .../savepoint/_metadata | Bin 0 -> 21448 bytes .../plan/window-rank-hop-tvf-min-top-n.json | 687 +++++++++++++++++++++ .../savepoint/_metadata | Bin 0 -> 21929 bytes .../plan/window-rank-tumble-tvf-agg-max-top-n.json | 664 ++++++++++++++++++++ .../savepoint/_metadata | Bin 0 -> 23994 bytes .../plan/window-rank-tumble-tvf-agg-min-top-n.json | 664 ++++++++++++++++++++ .../savepoint/_metadata | Bin 0 -> 24000 bytes .../plan/window-rank-tumble-tvf-max-top-n.json | 685 ++++++++++++++++++++ .../savepoint/_metadata | Bin 0 -> 21084 bytes .../plan/window-rank-tumble-tvf-min-top-n.json | 685 ++++++++++++++++++++ .../savepoint/_metadata | Bin 0 -> 21084 bytes 14 files changed, 4382 insertions(+) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowRankRestoreTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowRankRestoreTest.java new file mode 100644 index 00000000000..ddc47a8f624 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowRankRestoreTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase; +import org.apache.flink.table.test.program.TableTestProgram; + +import java.util.Arrays; +import java.util.List; + +/** Restore tests for {@link StreamExecWindowRank}. */ +public class WindowRankRestoreTest extends RestoreTestBase { + + public WindowRankRestoreTest() { + super(StreamExecWindowRank.class); + } + + @Override + public List<TableTestProgram> programs() { + return Arrays.asList( + WindowRankTestPrograms.WINDOW_RANK_TUMBLE_TVF_MIN_TOP_N, + WindowRankTestPrograms.WINDOW_RANK_TUMBLE_TVF_AGG_MIN_TOP_N, + WindowRankTestPrograms.WINDOW_RANK_TUMBLE_TVF_MAX_TOP_N, + WindowRankTestPrograms.WINDOW_RANK_TUMBLE_TVF_AGG_MAX_TOP_N, + WindowRankTestPrograms.WINDOW_RANK_HOP_TVF_MIN_TOP_N, + WindowRankTestPrograms.WINDOW_RANK_CUMULATE_TVF_MIN_TOP_N); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowRankTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowRankTestPrograms.java new file mode 100644 index 00000000000..bacbe648a9d --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowRankTestPrograms.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.types.Row; + +import java.math.BigDecimal; + +/** {@link TableTestProgram} definitions for testing {@link StreamExecWindowRank}. */ +public class WindowRankTestPrograms { + + static final Row[] BEFORE_DATA = { + Row.of("2020-04-15 08:00:05", new BigDecimal(4.00), "A", "supplier1"), + Row.of("2020-04-15 08:00:06", new BigDecimal(4.00), "C", "supplier2"), + Row.of("2020-04-15 08:00:07", new BigDecimal(2.00), "G", "supplier1"), + Row.of("2020-04-15 08:00:08", new BigDecimal(2.00), "B", "supplier3"), + Row.of("2020-04-15 08:00:09", new BigDecimal(5.00), "D", "supplier4"), + Row.of("2020-04-15 08:00:11", new BigDecimal(2.00), "B", "supplier3"), + Row.of("2020-04-15 08:00:13", new BigDecimal(1.00), "E", "supplier1"), + Row.of("2020-04-15 08:00:15", new BigDecimal(3.00), "H", "supplier2"), + Row.of("2020-04-15 08:00:17", new BigDecimal(6.00), "F", "supplier5") + }; + + static final Row[] AFTER_DATA = { + Row.of("2020-04-15 08:00:21", new BigDecimal(2.00), "B", "supplier7"), + Row.of("2020-04-15 08:00:23", new BigDecimal(1.00), "A", "supplier4"), + Row.of("2020-04-15 08:00:25", new BigDecimal(3.00), "C", "supplier3"), + Row.of("2020-04-15 08:00:28", new BigDecimal(6.00), "A", "supplier8") + }; + + static final SourceTestStep SOURCE = + SourceTestStep.newBuilder("bid_t") + .addSchema( + "ts STRING", + "price DECIMAL(10,2)", + "item STRING", + "supplier_id STRING", + "`bid_time` AS TO_TIMESTAMP(`ts`)", + "`proc_time` AS PROCTIME()", + "WATERMARK for `bid_time` AS `bid_time` - INTERVAL '1' SECOND") + .producedBeforeRestore(BEFORE_DATA) + .producedAfterRestore(AFTER_DATA) + .build(); + + static final String[] SINK_SCHEMA = { + "window_start TIMESTAMP(3)", + "window_end TIMESTAMP(3)", + "bid_time TIMESTAMP(3)", + "supplier_id STRING", + "price DECIMAL(10,2)", + "item STRING", + "row_num BIGINT" + }; + + static final String[] SINK_TVF_AGG_SCHEMA = { + "window_start TIMESTAMP(3)", + "window_end TIMESTAMP(3)", + "supplier_id STRING", + "total_price DECIMAL(10,2)", + "cnt BIGINT", + "row_num BIGINT" + }; + + static final String TUMBLE_TVF = + "TABLE(TUMBLE(TABLE bid_t, DESCRIPTOR(bid_time), INTERVAL '10' SECOND))"; + + static final String HOP_TVF = + "TABLE(HOP(TABLE bid_t, DESCRIPTOR(bid_time), INTERVAL '5' SECOND, INTERVAL '10' SECOND))"; + + static final String CUMULATE_TVF = + "TABLE(CUMULATE(TABLE bid_t, DESCRIPTOR(bid_time), INTERVAL '5' SECOND, INTERVAL '10' SECOND))"; + + static final String QUERY_TVF_TOP_N = + "INSERT INTO sink_t SELECT *\n" + + " FROM (\n" + + " SELECT\n" + + " window_start,\n" + + " window_end, \n" + + " bid_time,\n" + + " supplier_id,\n" + + " price,\n" + + " item,\n" + + " ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price %s) AS row_num\n" + + " FROM %s\n" // Window TVF + + " ) WHERE row_num <= 3"; // row_num must be greater than 1 + + static final String QUERY_TVF_AGG_TOP_N = + "INSERT INTO sink_t SELECT *\n" + + " FROM (\n" + + " SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price %s) as row_num\n" + + " FROM (\n" + + " SELECT window_start, window_end, supplier_id, SUM(price) as price, COUNT(*) as cnt\n" + + " FROM %s\n" // Window TVF + + " GROUP BY window_start, window_end, supplier_id\n" + + " )\n" + + " ) WHERE row_num <= 3"; // row_num must be greater than 1 + + static final TableTestProgram WINDOW_RANK_TUMBLE_TVF_MIN_TOP_N = + TableTestProgram.of( + "window-rank-tumble-tvf-min-top-n", + "validates window min top-n follows after tumbling window") + .setupTableSource(SOURCE) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema(SINK_SCHEMA) + .consumedBeforeRestore( + "+I[2020-04-15T08:00, 2020-04-15T08:00:10, 2020-04-15T08:00:07, supplier1, 2.00, G, 1]", + "+I[2020-04-15T08:00, 2020-04-15T08:00:10, 2020-04-15T08:00:08, supplier3, 2.00, B, 2]", + "+I[2020-04-15T08:00, 2020-04-15T08:00:10, 2020-04-15T08:00:05, supplier1, 4.00, A, 3]") + .consumedAfterRestore( + "+I[2020-04-15T08:00:10, 2020-04-15T08:00:20, 2020-04-15T08:00:13, supplier1, 1.00, E, 1]", + "+I[2020-04-15T08:00:10, 2020-04-15T08:00:20, 2020-04-15T08:00:11, supplier3, 2.00, B, 2]", + "+I[2020-04-15T08:00:10, 2020-04-15T08:00:20, 2020-04-15T08:00:15, supplier2, 3.00, H, 3]", + "+I[2020-04-15T08:00:20, 2020-04-15T08:00:30, 2020-04-15T08:00:23, supplier4, 1.00, A, 1]", + "+I[2020-04-15T08:00:20, 2020-04-15T08:00:30, 2020-04-15T08:00:21, supplier7, 2.00, B, 2]", + "+I[2020-04-15T08:00:20, 2020-04-15T08:00:30, 2020-04-15T08:00:25, supplier3, 3.00, C, 3]") + .build()) + .runSql(String.format(QUERY_TVF_TOP_N, "ASC", TUMBLE_TVF)) + .build(); + + static final TableTestProgram WINDOW_RANK_TUMBLE_TVF_AGG_MIN_TOP_N = + TableTestProgram.of( + "window-rank-tumble-tvf-agg-min-top-n", + "validates window min top-n with tumbling window follows after window aggregation") + .setupTableSource(SOURCE) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema(SINK_TVF_AGG_SCHEMA) + .consumedBeforeRestore( + "+I[2020-04-15T08:00, 2020-04-15T08:00:10, supplier3, 2.00, 1, 1]", + "+I[2020-04-15T08:00, 2020-04-15T08:00:10, supplier2, 4.00, 1, 2]", + "+I[2020-04-15T08:00, 2020-04-15T08:00:10, supplier4, 5.00, 1, 3]") + .consumedAfterRestore( + "+I[2020-04-15T08:00:10, 2020-04-15T08:00:20, supplier1, 1.00, 1, 1]", + "+I[2020-04-15T08:00:10, 2020-04-15T08:00:20, supplier3, 2.00, 1, 2]", + "+I[2020-04-15T08:00:10, 2020-04-15T08:00:20, supplier2, 3.00, 1, 3]", + "+I[2020-04-15T08:00:20, 2020-04-15T08:00:30, supplier4, 1.00, 1, 1]", + "+I[2020-04-15T08:00:20, 2020-04-15T08:00:30, supplier7, 2.00, 1, 2]", + "+I[2020-04-15T08:00:20, 2020-04-15T08:00:30, supplier3, 3.00, 1, 3]") + .build()) + .runSql(String.format(QUERY_TVF_AGG_TOP_N, "ASC", TUMBLE_TVF)) + .build(); + + static final TableTestProgram WINDOW_RANK_TUMBLE_TVF_MAX_TOP_N = + TableTestProgram.of( + "window-rank-tumble-tvf-max-top-n", + "validates window max top-n follows after tumbling window") + .setupTableSource(SOURCE) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema(SINK_SCHEMA) + .consumedBeforeRestore( + "+I[2020-04-15T08:00, 2020-04-15T08:00:10, 2020-04-15T08:00:09, supplier4, 5.00, D, 1]", + "+I[2020-04-15T08:00, 2020-04-15T08:00:10, 2020-04-15T08:00:05, supplier1, 4.00, A, 2]", + "+I[2020-04-15T08:00, 2020-04-15T08:00:10, 2020-04-15T08:00:06, supplier2, 4.00, C, 3]") + .consumedAfterRestore( + "+I[2020-04-15T08:00:10, 2020-04-15T08:00:20, 2020-04-15T08:00:17, supplier5, 6.00, F, 1]", + "+I[2020-04-15T08:00:10, 2020-04-15T08:00:20, 2020-04-15T08:00:15, supplier2, 3.00, H, 2]", + "+I[2020-04-15T08:00:10, 2020-04-15T08:00:20, 2020-04-15T08:00:11, supplier3, 2.00, B, 3]", + "+I[2020-04-15T08:00:20, 2020-04-15T08:00:30, 2020-04-15T08:00:28, supplier8, 6.00, A, 1]", + "+I[2020-04-15T08:00:20, 2020-04-15T08:00:30, 2020-04-15T08:00:25, supplier3, 3.00, C, 2]", + "+I[2020-04-15T08:00:20, 2020-04-15T08:00:30, 2020-04-15T08:00:21, supplier7, 2.00, B, 3]") + .build()) + .runSql(String.format(QUERY_TVF_TOP_N, "DESC", TUMBLE_TVF)) + .build(); + + static final TableTestProgram WINDOW_RANK_TUMBLE_TVF_AGG_MAX_TOP_N = + TableTestProgram.of( + "window-rank-tumble-tvf-agg-max-top-n", + "validates window max top-n with tumbling window follows after window aggregation") + .setupTableSource(SOURCE) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema(SINK_TVF_AGG_SCHEMA) + .consumedBeforeRestore( + "+I[2020-04-15T08:00, 2020-04-15T08:00:10, supplier1, 6.00, 2, 1]", + "+I[2020-04-15T08:00, 2020-04-15T08:00:10, supplier4, 5.00, 1, 2]", + "+I[2020-04-15T08:00, 2020-04-15T08:00:10, supplier2, 4.00, 1, 3]") + .consumedAfterRestore( + "+I[2020-04-15T08:00:10, 2020-04-15T08:00:20, supplier5, 6.00, 1, 1]", + "+I[2020-04-15T08:00:10, 2020-04-15T08:00:20, supplier2, 3.00, 1, 2]", + "+I[2020-04-15T08:00:10, 2020-04-15T08:00:20, supplier3, 2.00, 1, 3]", + "+I[2020-04-15T08:00:20, 2020-04-15T08:00:30, supplier8, 6.00, 1, 1]", + "+I[2020-04-15T08:00:20, 2020-04-15T08:00:30, supplier3, 3.00, 1, 2]", + "+I[2020-04-15T08:00:20, 2020-04-15T08:00:30, supplier7, 2.00, 1, 3]") + .build()) + .runSql(String.format(QUERY_TVF_AGG_TOP_N, "DESC", TUMBLE_TVF)) + .build(); + + static final TableTestProgram WINDOW_RANK_HOP_TVF_MIN_TOP_N = + TableTestProgram.of( + "window-rank-hop-tvf-min-top-n", + "validates window min top-n follows after hop window") + .setupTableSource(SOURCE) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema(SINK_SCHEMA) + .consumedBeforeRestore( + "+I[2020-04-15T08:00, 2020-04-15T08:00:10, 2020-04-15T08:00:07, supplier1, 2.00, G, 1]", + "+I[2020-04-15T08:00, 2020-04-15T08:00:10, 2020-04-15T08:00:08, supplier3, 2.00, B, 2]", + "+I[2020-04-15T08:00, 2020-04-15T08:00:10, 2020-04-15T08:00:05, supplier1, 4.00, A, 3]", + "+I[2020-04-15T08:00:05, 2020-04-15T08:00:15, 2020-04-15T08:00:13, supplier1, 1.00, E, 1]", + "+I[2020-04-15T08:00:05, 2020-04-15T08:00:15, 2020-04-15T08:00:07, supplier1, 2.00, G, 2]", + "+I[2020-04-15T08:00:05, 2020-04-15T08:00:15, 2020-04-15T08:00:08, supplier3, 2.00, B, 3]") + .consumedAfterRestore( + "+I[2020-04-15T08:00:10, 2020-04-15T08:00:20, 2020-04-15T08:00:13, supplier1, 1.00, E, 1]", + "+I[2020-04-15T08:00:10, 2020-04-15T08:00:20, 2020-04-15T08:00:11, supplier3, 2.00, B, 2]", + "+I[2020-04-15T08:00:10, 2020-04-15T08:00:20, 2020-04-15T08:00:15, supplier2, 3.00, H, 3]", + "+I[2020-04-15T08:00:15, 2020-04-15T08:00:25, 2020-04-15T08:00:23, supplier4, 1.00, A, 1]", + "+I[2020-04-15T08:00:15, 2020-04-15T08:00:25, 2020-04-15T08:00:21, supplier7, 2.00, B, 2]", + "+I[2020-04-15T08:00:15, 2020-04-15T08:00:25, 2020-04-15T08:00:15, supplier2, 3.00, H, 3]", + "+I[2020-04-15T08:00:20, 2020-04-15T08:00:30, 2020-04-15T08:00:23, supplier4, 1.00, A, 1]", + "+I[2020-04-15T08:00:20, 2020-04-15T08:00:30, 2020-04-15T08:00:21, supplier7, 2.00, B, 2]", + "+I[2020-04-15T08:00:20, 2020-04-15T08:00:30, 2020-04-15T08:00:25, supplier3, 3.00, C, 3]", + "+I[2020-04-15T08:00:25, 2020-04-15T08:00:35, 2020-04-15T08:00:25, supplier3, 3.00, C, 1]", + "+I[2020-04-15T08:00:25, 2020-04-15T08:00:35, 2020-04-15T08:00:28, supplier8, 6.00, A, 2]") + .build()) + .runSql(String.format(QUERY_TVF_TOP_N, "ASC", HOP_TVF)) + .build(); + + static final TableTestProgram WINDOW_RANK_CUMULATE_TVF_MIN_TOP_N = + TableTestProgram.of( + "window-rank-cumulate-tvf-min-top-n", + "validates window min top-n follows after cumulate window") + .setupTableSource(SOURCE) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema(SINK_SCHEMA) + .consumedBeforeRestore( + "+I[2020-04-15T08:00, 2020-04-15T08:00:10, 2020-04-15T08:00:07, supplier1, 2.00, G, 1]", + "+I[2020-04-15T08:00, 2020-04-15T08:00:10, 2020-04-15T08:00:08, supplier3, 2.00, B, 2]", + "+I[2020-04-15T08:00, 2020-04-15T08:00:10, 2020-04-15T08:00:05, supplier1, 4.00, A, 3]", + "+I[2020-04-15T08:00:10, 2020-04-15T08:00:15, 2020-04-15T08:00:13, supplier1, 1.00, E, 1]", + "+I[2020-04-15T08:00:10, 2020-04-15T08:00:15, 2020-04-15T08:00:11, supplier3, 2.00, B, 2]") + .consumedAfterRestore( + "+I[2020-04-15T08:00:10, 2020-04-15T08:00:20, 2020-04-15T08:00:13, supplier1, 1.00, E, 1]", + "+I[2020-04-15T08:00:10, 2020-04-15T08:00:20, 2020-04-15T08:00:11, supplier3, 2.00, B, 2]", + "+I[2020-04-15T08:00:10, 2020-04-15T08:00:20, 2020-04-15T08:00:15, supplier2, 3.00, H, 3]", + "+I[2020-04-15T08:00:20, 2020-04-15T08:00:25, 2020-04-15T08:00:23, supplier4, 1.00, A, 1]", + "+I[2020-04-15T08:00:20, 2020-04-15T08:00:25, 2020-04-15T08:00:21, supplier7, 2.00, B, 2]", + "+I[2020-04-15T08:00:20, 2020-04-15T08:00:30, 2020-04-15T08:00:23, supplier4, 1.00, A, 1]", + "+I[2020-04-15T08:00:20, 2020-04-15T08:00:30, 2020-04-15T08:00:21, supplier7, 2.00, B, 2]", + "+I[2020-04-15T08:00:20, 2020-04-15T08:00:30, 2020-04-15T08:00:25, supplier3, 3.00, C, 3]") + .build()) + .runSql(String.format(QUERY_TVF_TOP_N, "ASC", CUMULATE_TVF)) + .build(); +} diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-window-rank_1/window-rank-cumulate-tvf-min-top-n/plan/window-rank-cumulate-tvf-min-top-n.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-window-rank_1/window-rank-cumulate-tvf-min-top-n/plan/window-rank-cumulate-tvf-min-top-n.json new file mode 100644 index 00000000000..c6f8bc6e896 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-window-rank_1/window-rank-cumulate-tvf-min-top-n/plan/window-rank-cumulate-tvf-min-top-n.json @@ -0,0 +1,687 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 52, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`bid_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "ts", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "price", + "dataType" : "DECIMAL(10, 2)" + }, { + "name" : "item", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "supplier_id", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "bid_time", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "TO_TIMESTAMP(`ts`)" + } + }, { + "name" : "proc_time", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$PROCTIME$1", + "operands" : [ ], + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, + "serializableString" : "PROCTIME()" + } + } ], + "watermarkSpecs" : [ { + "rowtimeAttribute" : "bid_time", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$-$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "TIMESTAMP(3)" + }, { + "kind" : "LITERAL", + "value" : "1000", + "type" : "INTERVAL SECOND(6) NOT NULL" + } ], + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "`bid_time` - INTERVAL '1' SECOND" + } + } ] + }, + "partitionKeys" : [ ] + } + } + }, + "outputType" : "ROW<`ts` VARCHAR(2147483647), `price` DECIMAL(10, 2), `item` VARCHAR(2147483647), `supplier_id` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, bid_t]], fields=[ts, price, item, supplier_id])", + "inputProperties" : [ ] + }, { + "id" : 53, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "DECIMAL(10, 2)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + }, { + "kind" : "CALL", + "internalName" : "$PROCTIME$1", + "operands" : [ ], + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "ts", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "price", + "fieldType" : "DECIMAL(10, 2)" + }, { + "name" : "item", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "supplier_id", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "bid_time", + "fieldType" : "TIMESTAMP(3)" + }, { + "name" : "proc_time", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + } ] + }, + "description" : "Calc(select=[ts, price, item, supplier_id, TO_TIMESTAMP(ts) AS bid_time, PROCTIME() AS proc_time])" + }, { + "id" : 54, + "type" : "stream-exec-watermark-assigner_1", + "watermarkExpr" : { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$-$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "TIMESTAMP(3)" + }, { + "kind" : "LITERAL", + "value" : "1000", + "type" : "INTERVAL SECOND(6) NOT NULL" + } ], + "type" : "TIMESTAMP(3)" + }, + "rowtimeFieldIndex" : 4, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "ts", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "price", + "fieldType" : "DECIMAL(10, 2)" + }, { + "name" : "item", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "supplier_id", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "bid_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "name" : "proc_time", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + } ] + }, + "description" : "WatermarkAssigner(rowtime=[bid_time], watermark=[(bid_time - 1000:INTERVAL SECOND)])" + }, { + "id" : 55, + "type" : "stream-exec-window-table-function_1", + "configuration" : { + "table.local-time-zone" : "default" + }, + "windowing" : { + "strategy" : "TimeAttribute", + "window" : { + "type" : "CumulativeWindow", + "maxSize" : "PT10S", + "step" : "PT5S" + }, + "timeAttributeType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + }, + "timeAttributeIndex" : 4, + "isRowtime" : true + }, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "ts", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "price", + "fieldType" : "DECIMAL(10, 2)" + }, { + "name" : "item", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "supplier_id", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "bid_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "name" : "proc_time", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, { + "name" : "window_start", + "fieldType" : "TIMESTAMP(3) NOT NULL" + }, { + "name" : "window_end", + "fieldType" : "TIMESTAMP(3) NOT NULL" + }, { + "name" : "window_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "WindowTableFunction(window=[CUMULATE(time_col=[bid_time], max_size=[10 s], step=[5 s])])" + }, { + "id" : 56, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "DECIMAL(10, 2)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 6, + "type" : "TIMESTAMP(3) NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 7, + "type" : "TIMESTAMP(3) NOT NULL" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "price", + "fieldType" : "DECIMAL(10, 2)" + }, { + "name" : "item", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "supplier_id", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "bid_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "name" : "window_start", + "fieldType" : "TIMESTAMP(3) NOT NULL" + }, { + "name" : "window_end", + "fieldType" : "TIMESTAMP(3) NOT NULL" + } ] + }, + "description" : "Calc(select=[price, item, supplier_id, bid_time, window_start, window_end])" + }, { + "id" : 57, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "SINGLETON" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "price", + "fieldType" : "DECIMAL(10, 2)" + }, { + "name" : "item", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "supplier_id", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "bid_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "name" : "window_start", + "fieldType" : "TIMESTAMP(3) NOT NULL" + }, { + "name" : "window_end", + "fieldType" : "TIMESTAMP(3) NOT NULL" + } ] + }, + "description" : "Exchange(distribution=[single])" + }, { + "id" : 58, + "type" : "stream-exec-window-rank_1", + "configuration" : { + "table.local-time-zone" : "default" + }, + "rankType" : "ROW_NUMBER", + "partitionSpec" : { + "fields" : [ ] + }, + "sortSpec" : { + "fields" : [ { + "index" : 0, + "isAscending" : true, + "nullIsLast" : false + } ] + }, + "rankRange" : { + "type" : "Constant", + "start" : 1, + "end" : 3 + }, + "outputRowNumber" : true, + "windowing" : { + "strategy" : "WindowAttached", + "window" : { + "type" : "CumulativeWindow", + "maxSize" : "PT10S", + "step" : "PT5S" + }, + "timeAttributeType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + }, + "windowStart" : 4, + "windowEnd" : 5, + "isRowtime" : true + }, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "price", + "fieldType" : "DECIMAL(10, 2)" + }, { + "name" : "item", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "supplier_id", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "bid_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "name" : "window_start", + "fieldType" : "TIMESTAMP(3) NOT NULL" + }, { + "name" : "window_end", + "fieldType" : "TIMESTAMP(3) NOT NULL" + }, { + "name" : "row_num", + "fieldType" : "BIGINT NOT NULL" + } ] + }, + "description" : "WindowRank(window=[CUMULATE(win_start=[window_start], win_end=[window_end], max_size=[10 s], step=[5 s])], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=3], partitionBy=[], orderBy=[price ASC], select=[price, item, supplier_id, bid_time, window_start, window_end, row_num])" + }, { + "id" : 59, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "TIMESTAMP(3) NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 5, + "type" : "TIMESTAMP(3) NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "DECIMAL(10, 2)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 6, + "type" : "BIGINT NOT NULL" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "window_start", + "fieldType" : "TIMESTAMP(3) NOT NULL" + }, { + "name" : "window_end", + "fieldType" : "TIMESTAMP(3) NOT NULL" + }, { + "name" : "bid_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "name" : "supplier_id", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "price", + "fieldType" : "DECIMAL(10, 2)" + }, { + "name" : "item", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "row_num", + "fieldType" : "BIGINT NOT NULL" + } ] + }, + "description" : "Calc(select=[window_start, window_end, bid_time, supplier_id, price, item, row_num])" + }, { + "id" : 60, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "window_start", + "dataType" : "TIMESTAMP(3)" + }, { + "name" : "window_end", + "dataType" : "TIMESTAMP(3)" + }, { + "name" : "bid_time", + "dataType" : "TIMESTAMP(3)" + }, { + "name" : "supplier_id", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "price", + "dataType" : "DECIMAL(10, 2)" + }, { + "name" : "item", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "row_num", + "dataType" : "BIGINT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT" ], + "inputUpsertKey" : [ 6 ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "window_start", + "fieldType" : "TIMESTAMP(3) NOT NULL" + }, { + "name" : "window_end", + "fieldType" : "TIMESTAMP(3) NOT NULL" + }, { + "name" : "bid_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "name" : "supplier_id", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "price", + "fieldType" : "DECIMAL(10, 2)" + }, { + "name" : "item", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "row_num", + "fieldType" : "BIGINT NOT NULL" + } ] + }, + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[window_start, window_end, bid_time, supplier_id, price, item, row_num])" + } ], + "edges" : [ { + "source" : 52, + "target" : 53, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 53, + "target" : 54, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 54, + "target" : 55, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 55, + "target" : 56, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 56, + "target" : 57, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 57, + "target" : 58, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 58, + "target" : 59, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 59, + "target" : 60, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-window-rank_1/window-rank-cumulate-tvf-min-top-n/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-window-rank_1/window-rank-cumulate-tvf-min-top-n/savepoint/_metadata new file mode 100644 index 00000000000..24afda8be24 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-window-rank_1/window-rank-cumulate-tvf-min-top-n/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-window-rank_1/window-rank-hop-tvf-min-top-n/plan/window-rank-hop-tvf-min-top-n.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-window-rank_1/window-rank-hop-tvf-min-top-n/plan/window-rank-hop-tvf-min-top-n.json new file mode 100644 index 00000000000..4662a07d032 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-window-rank_1/window-rank-hop-tvf-min-top-n/plan/window-rank-hop-tvf-min-top-n.json @@ -0,0 +1,687 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 43, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`bid_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "ts", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "price", + "dataType" : "DECIMAL(10, 2)" + }, { + "name" : "item", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "supplier_id", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "bid_time", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "TO_TIMESTAMP(`ts`)" + } + }, { + "name" : "proc_time", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$PROCTIME$1", + "operands" : [ ], + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, + "serializableString" : "PROCTIME()" + } + } ], + "watermarkSpecs" : [ { + "rowtimeAttribute" : "bid_time", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$-$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "TIMESTAMP(3)" + }, { + "kind" : "LITERAL", + "value" : "1000", + "type" : "INTERVAL SECOND(6) NOT NULL" + } ], + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "`bid_time` - INTERVAL '1' SECOND" + } + } ] + }, + "partitionKeys" : [ ] + } + } + }, + "outputType" : "ROW<`ts` VARCHAR(2147483647), `price` DECIMAL(10, 2), `item` VARCHAR(2147483647), `supplier_id` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, bid_t]], fields=[ts, price, item, supplier_id])", + "inputProperties" : [ ] + }, { + "id" : 44, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "DECIMAL(10, 2)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + }, { + "kind" : "CALL", + "internalName" : "$PROCTIME$1", + "operands" : [ ], + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "ts", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "price", + "fieldType" : "DECIMAL(10, 2)" + }, { + "name" : "item", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "supplier_id", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "bid_time", + "fieldType" : "TIMESTAMP(3)" + }, { + "name" : "proc_time", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + } ] + }, + "description" : "Calc(select=[ts, price, item, supplier_id, TO_TIMESTAMP(ts) AS bid_time, PROCTIME() AS proc_time])" + }, { + "id" : 45, + "type" : "stream-exec-watermark-assigner_1", + "watermarkExpr" : { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$-$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "TIMESTAMP(3)" + }, { + "kind" : "LITERAL", + "value" : "1000", + "type" : "INTERVAL SECOND(6) NOT NULL" + } ], + "type" : "TIMESTAMP(3)" + }, + "rowtimeFieldIndex" : 4, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "ts", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "price", + "fieldType" : "DECIMAL(10, 2)" + }, { + "name" : "item", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "supplier_id", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "bid_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "name" : "proc_time", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + } ] + }, + "description" : "WatermarkAssigner(rowtime=[bid_time], watermark=[(bid_time - 1000:INTERVAL SECOND)])" + }, { + "id" : 46, + "type" : "stream-exec-window-table-function_1", + "configuration" : { + "table.local-time-zone" : "default" + }, + "windowing" : { + "strategy" : "TimeAttribute", + "window" : { + "type" : "HoppingWindow", + "size" : "PT10S", + "slide" : "PT5S" + }, + "timeAttributeType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + }, + "timeAttributeIndex" : 4, + "isRowtime" : true + }, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "ts", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "price", + "fieldType" : "DECIMAL(10, 2)" + }, { + "name" : "item", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "supplier_id", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "bid_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "name" : "proc_time", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, { + "name" : "window_start", + "fieldType" : "TIMESTAMP(3) NOT NULL" + }, { + "name" : "window_end", + "fieldType" : "TIMESTAMP(3) NOT NULL" + }, { + "name" : "window_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "WindowTableFunction(window=[HOP(time_col=[bid_time], size=[10 s], slide=[5 s])])" + }, { + "id" : 47, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "DECIMAL(10, 2)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 6, + "type" : "TIMESTAMP(3) NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 7, + "type" : "TIMESTAMP(3) NOT NULL" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "price", + "fieldType" : "DECIMAL(10, 2)" + }, { + "name" : "item", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "supplier_id", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "bid_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "name" : "window_start", + "fieldType" : "TIMESTAMP(3) NOT NULL" + }, { + "name" : "window_end", + "fieldType" : "TIMESTAMP(3) NOT NULL" + } ] + }, + "description" : "Calc(select=[price, item, supplier_id, bid_time, window_start, window_end])" + }, { + "id" : 48, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "SINGLETON" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "price", + "fieldType" : "DECIMAL(10, 2)" + }, { + "name" : "item", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "supplier_id", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "bid_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "name" : "window_start", + "fieldType" : "TIMESTAMP(3) NOT NULL" + }, { + "name" : "window_end", + "fieldType" : "TIMESTAMP(3) NOT NULL" + } ] + }, + "description" : "Exchange(distribution=[single])" + }, { + "id" : 49, + "type" : "stream-exec-window-rank_1", + "configuration" : { + "table.local-time-zone" : "default" + }, + "rankType" : "ROW_NUMBER", + "partitionSpec" : { + "fields" : [ ] + }, + "sortSpec" : { + "fields" : [ { + "index" : 0, + "isAscending" : true, + "nullIsLast" : false + } ] + }, + "rankRange" : { + "type" : "Constant", + "start" : 1, + "end" : 3 + }, + "outputRowNumber" : true, + "windowing" : { + "strategy" : "WindowAttached", + "window" : { + "type" : "HoppingWindow", + "size" : "PT10S", + "slide" : "PT5S" + }, + "timeAttributeType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + }, + "windowStart" : 4, + "windowEnd" : 5, + "isRowtime" : true + }, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "price", + "fieldType" : "DECIMAL(10, 2)" + }, { + "name" : "item", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "supplier_id", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "bid_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "name" : "window_start", + "fieldType" : "TIMESTAMP(3) NOT NULL" + }, { + "name" : "window_end", + "fieldType" : "TIMESTAMP(3) NOT NULL" + }, { + "name" : "row_num", + "fieldType" : "BIGINT NOT NULL" + } ] + }, + "description" : "WindowRank(window=[HOP(win_start=[window_start], win_end=[window_end], size=[10 s], slide=[5 s])], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=3], partitionBy=[], orderBy=[price ASC], select=[price, item, supplier_id, bid_time, window_start, window_end, row_num])" + }, { + "id" : 50, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "TIMESTAMP(3) NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 5, + "type" : "TIMESTAMP(3) NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "DECIMAL(10, 2)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 6, + "type" : "BIGINT NOT NULL" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "window_start", + "fieldType" : "TIMESTAMP(3) NOT NULL" + }, { + "name" : "window_end", + "fieldType" : "TIMESTAMP(3) NOT NULL" + }, { + "name" : "bid_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "name" : "supplier_id", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "price", + "fieldType" : "DECIMAL(10, 2)" + }, { + "name" : "item", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "row_num", + "fieldType" : "BIGINT NOT NULL" + } ] + }, + "description" : "Calc(select=[window_start, window_end, bid_time, supplier_id, price, item, row_num])" + }, { + "id" : 51, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "window_start", + "dataType" : "TIMESTAMP(3)" + }, { + "name" : "window_end", + "dataType" : "TIMESTAMP(3)" + }, { + "name" : "bid_time", + "dataType" : "TIMESTAMP(3)" + }, { + "name" : "supplier_id", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "price", + "dataType" : "DECIMAL(10, 2)" + }, { + "name" : "item", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "row_num", + "dataType" : "BIGINT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT" ], + "inputUpsertKey" : [ 6 ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "window_start", + "fieldType" : "TIMESTAMP(3) NOT NULL" + }, { + "name" : "window_end", + "fieldType" : "TIMESTAMP(3) NOT NULL" + }, { + "name" : "bid_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "name" : "supplier_id", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "price", + "fieldType" : "DECIMAL(10, 2)" + }, { + "name" : "item", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "row_num", + "fieldType" : "BIGINT NOT NULL" + } ] + }, + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[window_start, window_end, bid_time, supplier_id, price, item, row_num])" + } ], + "edges" : [ { + "source" : 43, + "target" : 44, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 44, + "target" : 45, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 45, + "target" : 46, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 46, + "target" : 47, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 47, + "target" : 48, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 48, + "target" : 49, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 49, + "target" : 50, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 50, + "target" : 51, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-window-rank_1/window-rank-hop-tvf-min-top-n/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-window-rank_1/window-rank-hop-tvf-min-top-n/savepoint/_metadata new file mode 100644 index 00000000000..bdc9b5bd75b Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-window-rank_1/window-rank-hop-tvf-min-top-n/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-window-rank_1/window-rank-tumble-tvf-agg-max-top-n/plan/window-rank-tumble-tvf-agg-max-top-n.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-window-rank_1/window-rank-tumble-tvf-agg-max-top-n/plan/window-rank-tumble-tvf-agg-max-top-n.json new file mode 100644 index 00000000000..da3e30b1c43 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-window-rank_1/window-rank-tumble-tvf-agg-max-top-n/plan/window-rank-tumble-tvf-agg-max-top-n.json @@ -0,0 +1,664 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 31, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`bid_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "ts", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "price", + "dataType" : "DECIMAL(10, 2)" + }, { + "name" : "item", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "supplier_id", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "bid_time", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "TO_TIMESTAMP(`ts`)" + } + }, { + "name" : "proc_time", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$PROCTIME$1", + "operands" : [ ], + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, + "serializableString" : "PROCTIME()" + } + } ], + "watermarkSpecs" : [ { + "rowtimeAttribute" : "bid_time", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$-$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "TIMESTAMP(3)" + }, { + "kind" : "LITERAL", + "value" : "1000", + "type" : "INTERVAL SECOND(6) NOT NULL" + } ], + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "`bid_time` - INTERVAL '1' SECOND" + } + } ] + }, + "partitionKeys" : [ ] + } + }, + "abilities" : [ { + "type" : "ProjectPushDown", + "projectedFields" : [ [ 1 ], [ 3 ], [ 0 ] ], + "producedType" : "ROW<`price` DECIMAL(10, 2), `supplier_id` VARCHAR(2147483647), `ts` VARCHAR(2147483647)> NOT NULL" + }, { + "type" : "ReadingMetadata", + "metadataKeys" : [ ], + "producedType" : "ROW<`price` DECIMAL(10, 2), `supplier_id` VARCHAR(2147483647), `ts` VARCHAR(2147483647)> NOT NULL" + } ] + }, + "outputType" : "ROW<`price` DECIMAL(10, 2), `supplier_id` VARCHAR(2147483647), `ts` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, bid_t, project=[price, supplier_id, ts], metadata=[]]], fields=[price, supplier_id, ts])", + "inputProperties" : [ ] + }, { + "id" : 32, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "DECIMAL(10, 2)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`price` DECIMAL(10, 2), `supplier_id` VARCHAR(2147483647), `bid_time` TIMESTAMP(3)>", + "description" : "Calc(select=[price, supplier_id, TO_TIMESTAMP(ts) AS bid_time])" + }, { + "id" : 33, + "type" : "stream-exec-watermark-assigner_1", + "watermarkExpr" : { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$-$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "TIMESTAMP(3)" + }, { + "kind" : "LITERAL", + "value" : "1000", + "type" : "INTERVAL SECOND(6) NOT NULL" + } ], + "type" : "TIMESTAMP(3)" + }, + "rowtimeFieldIndex" : 2, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "price", + "fieldType" : "DECIMAL(10, 2)" + }, { + "name" : "supplier_id", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "bid_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "WatermarkAssigner(rowtime=[bid_time], watermark=[(bid_time - 1000:INTERVAL SECOND)])" + }, { + "id" : 34, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "DECIMAL(10, 2)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "supplier_id", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "price", + "fieldType" : "DECIMAL(10, 2)" + }, { + "name" : "bid_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "Calc(select=[supplier_id, price, bid_time])" + }, { + "id" : 35, + "type" : "stream-exec-local-window-aggregate_1", + "configuration" : { + "table.local-time-zone" : "default" + }, + "grouping" : [ 0 ], + "aggCalls" : [ { + "name" : "price", + "internalName" : "$SUM$1", + "argList" : [ 1 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "DECIMAL(38, 2)" + }, { + "name" : "cnt", + "syntax" : "FUNCTION_STAR", + "internalName" : "$COUNT$1", + "argList" : [ ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT NOT NULL" + } ], + "windowing" : { + "strategy" : "TimeAttribute", + "window" : { + "type" : "TumblingWindow", + "size" : "PT10S" + }, + "timeAttributeType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + }, + "timeAttributeIndex" : 2, + "isRowtime" : true + }, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`supplier_id` VARCHAR(2147483647), `sum$0` DECIMAL(38, 2), `count1$1` BIGINT, `$slice_end` BIGINT>", + "description" : "LocalWindowAggregate(groupBy=[supplier_id], window=[TUMBLE(time_col=[bid_time], size=[10 s])], select=[supplier_id, SUM(price) AS sum$0, COUNT(*) AS count1$1, slice_end('w$) AS $slice_end])" + }, { + "id" : 36, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`supplier_id` VARCHAR(2147483647), `sum$0` DECIMAL(38, 2), `count1$1` BIGINT, `$slice_end` BIGINT>", + "description" : "Exchange(distribution=[hash[supplier_id]])" + }, { + "id" : 37, + "type" : "stream-exec-global-window-aggregate_1", + "configuration" : { + "table.local-time-zone" : "default" + }, + "grouping" : [ 0 ], + "aggCalls" : [ { + "name" : "price", + "internalName" : "$SUM$1", + "argList" : [ 1 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "DECIMAL(38, 2)" + }, { + "name" : "cnt", + "syntax" : "FUNCTION_STAR", + "internalName" : "$COUNT$1", + "argList" : [ ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT NOT NULL" + } ], + "windowing" : { + "strategy" : "SliceAttached", + "window" : { + "type" : "TumblingWindow", + "size" : "PT10S" + }, + "timeAttributeType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + }, + "sliceEnd" : 3, + "isRowtime" : true + }, + "namedWindowProperties" : [ { + "name" : "window_start", + "property" : { + "kind" : "WindowStart", + "reference" : { + "name" : "w$", + "type" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } + } + }, { + "name" : "window_end", + "property" : { + "kind" : "WindowEnd", + "reference" : { + "name" : "w$", + "type" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } + } + } ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "localAggInputRowType" : { + "type" : "ROW", + "fields" : [ { + "name" : "supplier_id", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "price", + "fieldType" : "DECIMAL(10, 2)" + }, { + "name" : "bid_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "outputType" : "ROW<`supplier_id` VARCHAR(2147483647), `price` DECIMAL(38, 2), `cnt` BIGINT NOT NULL, `window_start` TIMESTAMP(3) NOT NULL, `window_end` TIMESTAMP(3) NOT NULL>", + "description" : "GlobalWindowAggregate(groupBy=[supplier_id], window=[TUMBLE(slice_end=[$slice_end], size=[10 s])], select=[supplier_id, SUM(sum$0) AS price, COUNT(count1$1) AS cnt, start('w$) AS window_start, end('w$) AS window_end])" + }, { + "id" : 38, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "TIMESTAMP(3) NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "TIMESTAMP(3) NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "DECIMAL(38, 2)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "BIGINT NOT NULL" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`window_start` TIMESTAMP(3) NOT NULL, `window_end` TIMESTAMP(3) NOT NULL, `supplier_id` VARCHAR(2147483647), `price` DECIMAL(38, 2), `cnt` BIGINT NOT NULL>", + "description" : "Calc(select=[window_start, window_end, supplier_id, price, cnt])" + }, { + "id" : 39, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "SINGLETON" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`window_start` TIMESTAMP(3) NOT NULL, `window_end` TIMESTAMP(3) NOT NULL, `supplier_id` VARCHAR(2147483647), `price` DECIMAL(38, 2), `cnt` BIGINT NOT NULL>", + "description" : "Exchange(distribution=[single])" + }, { + "id" : 40, + "type" : "stream-exec-window-rank_1", + "configuration" : { + "table.local-time-zone" : "default" + }, + "rankType" : "ROW_NUMBER", + "partitionSpec" : { + "fields" : [ ] + }, + "sortSpec" : { + "fields" : [ { + "index" : 3, + "isAscending" : false, + "nullIsLast" : true + } ] + }, + "rankRange" : { + "type" : "Constant", + "start" : 1, + "end" : 3 + }, + "outputRowNumber" : true, + "windowing" : { + "strategy" : "WindowAttached", + "window" : { + "type" : "TumblingWindow", + "size" : "PT10S" + }, + "timeAttributeType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + }, + "windowStart" : 0, + "windowEnd" : 1, + "isRowtime" : true + }, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`window_start` TIMESTAMP(3) NOT NULL, `window_end` TIMESTAMP(3) NOT NULL, `supplier_id` VARCHAR(2147483647), `price` DECIMAL(38, 2), `cnt` BIGINT NOT NULL, `w0$o0` BIGINT NOT NULL>", + "description" : "WindowRank(window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[10 s])], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=3], partitionBy=[], orderBy=[price DESC], select=[window_start, window_end, supplier_id, price, cnt, w0$o0])" + }, { + "id" : 41, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$CAST$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "TIMESTAMP(3) NOT NULL" + } ], + "type" : "TIMESTAMP(3)" + }, { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$CAST$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "TIMESTAMP(3) NOT NULL" + } ], + "type" : "TIMESTAMP(3)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$CAST$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "DECIMAL(38, 2)" + } ], + "type" : "DECIMAL(10, 2)" + }, { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$CAST$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "BIGINT NOT NULL" + } ], + "type" : "BIGINT" + }, { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$CAST$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 5, + "type" : "BIGINT NOT NULL" + } ], + "type" : "BIGINT" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`window_start` TIMESTAMP(3), `window_end` TIMESTAMP(3), `supplier_id` VARCHAR(2147483647), `total_price` DECIMAL(10, 2), `cnt` BIGINT, `row_num` BIGINT>", + "description" : "Calc(select=[CAST(window_start AS TIMESTAMP(3)) AS window_start, CAST(window_end AS TIMESTAMP(3)) AS window_end, supplier_id, CAST(price AS DECIMAL(10, 2)) AS total_price, CAST(cnt AS BIGINT) AS cnt, CAST(w0$o0 AS BIGINT) AS row_num])" + }, { + "id" : 42, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "window_start", + "dataType" : "TIMESTAMP(3)" + }, { + "name" : "window_end", + "dataType" : "TIMESTAMP(3)" + }, { + "name" : "supplier_id", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "total_price", + "dataType" : "DECIMAL(10, 2)" + }, { + "name" : "cnt", + "dataType" : "BIGINT" + }, { + "name" : "row_num", + "dataType" : "BIGINT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT" ], + "inputUpsertKey" : [ 5 ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`window_start` TIMESTAMP(3), `window_end` TIMESTAMP(3), `supplier_id` VARCHAR(2147483647), `total_price` DECIMAL(10, 2), `cnt` BIGINT, `row_num` BIGINT>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[window_start, window_end, supplier_id, total_price, cnt, row_num])" + } ], + "edges" : [ { + "source" : 31, + "target" : 32, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 32, + "target" : 33, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 33, + "target" : 34, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 34, + "target" : 35, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 35, + "target" : 36, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 36, + "target" : 37, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 37, + "target" : 38, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 38, + "target" : 39, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 39, + "target" : 40, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 40, + "target" : 41, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 41, + "target" : 42, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-window-rank_1/window-rank-tumble-tvf-agg-max-top-n/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-window-rank_1/window-rank-tumble-tvf-agg-max-top-n/savepoint/_metadata new file mode 100644 index 00000000000..95d1000a44b Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-window-rank_1/window-rank-tumble-tvf-agg-max-top-n/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-window-rank_1/window-rank-tumble-tvf-agg-min-top-n/plan/window-rank-tumble-tvf-agg-min-top-n.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-window-rank_1/window-rank-tumble-tvf-agg-min-top-n/plan/window-rank-tumble-tvf-agg-min-top-n.json new file mode 100644 index 00000000000..4ab7af3da5e --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-window-rank_1/window-rank-tumble-tvf-agg-min-top-n/plan/window-rank-tumble-tvf-agg-min-top-n.json @@ -0,0 +1,664 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 10, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`bid_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "ts", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "price", + "dataType" : "DECIMAL(10, 2)" + }, { + "name" : "item", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "supplier_id", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "bid_time", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "TO_TIMESTAMP(`ts`)" + } + }, { + "name" : "proc_time", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$PROCTIME$1", + "operands" : [ ], + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, + "serializableString" : "PROCTIME()" + } + } ], + "watermarkSpecs" : [ { + "rowtimeAttribute" : "bid_time", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$-$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "TIMESTAMP(3)" + }, { + "kind" : "LITERAL", + "value" : "1000", + "type" : "INTERVAL SECOND(6) NOT NULL" + } ], + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "`bid_time` - INTERVAL '1' SECOND" + } + } ] + }, + "partitionKeys" : [ ] + } + }, + "abilities" : [ { + "type" : "ProjectPushDown", + "projectedFields" : [ [ 1 ], [ 3 ], [ 0 ] ], + "producedType" : "ROW<`price` DECIMAL(10, 2), `supplier_id` VARCHAR(2147483647), `ts` VARCHAR(2147483647)> NOT NULL" + }, { + "type" : "ReadingMetadata", + "metadataKeys" : [ ], + "producedType" : "ROW<`price` DECIMAL(10, 2), `supplier_id` VARCHAR(2147483647), `ts` VARCHAR(2147483647)> NOT NULL" + } ] + }, + "outputType" : "ROW<`price` DECIMAL(10, 2), `supplier_id` VARCHAR(2147483647), `ts` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, bid_t, project=[price, supplier_id, ts], metadata=[]]], fields=[price, supplier_id, ts])", + "inputProperties" : [ ] + }, { + "id" : 11, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "DECIMAL(10, 2)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`price` DECIMAL(10, 2), `supplier_id` VARCHAR(2147483647), `bid_time` TIMESTAMP(3)>", + "description" : "Calc(select=[price, supplier_id, TO_TIMESTAMP(ts) AS bid_time])" + }, { + "id" : 12, + "type" : "stream-exec-watermark-assigner_1", + "watermarkExpr" : { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$-$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "TIMESTAMP(3)" + }, { + "kind" : "LITERAL", + "value" : "1000", + "type" : "INTERVAL SECOND(6) NOT NULL" + } ], + "type" : "TIMESTAMP(3)" + }, + "rowtimeFieldIndex" : 2, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "price", + "fieldType" : "DECIMAL(10, 2)" + }, { + "name" : "supplier_id", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "bid_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "WatermarkAssigner(rowtime=[bid_time], watermark=[(bid_time - 1000:INTERVAL SECOND)])" + }, { + "id" : 13, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "DECIMAL(10, 2)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "supplier_id", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "price", + "fieldType" : "DECIMAL(10, 2)" + }, { + "name" : "bid_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "Calc(select=[supplier_id, price, bid_time])" + }, { + "id" : 14, + "type" : "stream-exec-local-window-aggregate_1", + "configuration" : { + "table.local-time-zone" : "default" + }, + "grouping" : [ 0 ], + "aggCalls" : [ { + "name" : "price", + "internalName" : "$SUM$1", + "argList" : [ 1 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "DECIMAL(38, 2)" + }, { + "name" : "cnt", + "syntax" : "FUNCTION_STAR", + "internalName" : "$COUNT$1", + "argList" : [ ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT NOT NULL" + } ], + "windowing" : { + "strategy" : "TimeAttribute", + "window" : { + "type" : "TumblingWindow", + "size" : "PT10S" + }, + "timeAttributeType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + }, + "timeAttributeIndex" : 2, + "isRowtime" : true + }, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`supplier_id` VARCHAR(2147483647), `sum$0` DECIMAL(38, 2), `count1$1` BIGINT, `$slice_end` BIGINT>", + "description" : "LocalWindowAggregate(groupBy=[supplier_id], window=[TUMBLE(time_col=[bid_time], size=[10 s])], select=[supplier_id, SUM(price) AS sum$0, COUNT(*) AS count1$1, slice_end('w$) AS $slice_end])" + }, { + "id" : 15, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`supplier_id` VARCHAR(2147483647), `sum$0` DECIMAL(38, 2), `count1$1` BIGINT, `$slice_end` BIGINT>", + "description" : "Exchange(distribution=[hash[supplier_id]])" + }, { + "id" : 16, + "type" : "stream-exec-global-window-aggregate_1", + "configuration" : { + "table.local-time-zone" : "default" + }, + "grouping" : [ 0 ], + "aggCalls" : [ { + "name" : "price", + "internalName" : "$SUM$1", + "argList" : [ 1 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "DECIMAL(38, 2)" + }, { + "name" : "cnt", + "syntax" : "FUNCTION_STAR", + "internalName" : "$COUNT$1", + "argList" : [ ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT NOT NULL" + } ], + "windowing" : { + "strategy" : "SliceAttached", + "window" : { + "type" : "TumblingWindow", + "size" : "PT10S" + }, + "timeAttributeType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + }, + "sliceEnd" : 3, + "isRowtime" : true + }, + "namedWindowProperties" : [ { + "name" : "window_start", + "property" : { + "kind" : "WindowStart", + "reference" : { + "name" : "w$", + "type" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } + } + }, { + "name" : "window_end", + "property" : { + "kind" : "WindowEnd", + "reference" : { + "name" : "w$", + "type" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } + } + } ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "localAggInputRowType" : { + "type" : "ROW", + "fields" : [ { + "name" : "supplier_id", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "price", + "fieldType" : "DECIMAL(10, 2)" + }, { + "name" : "bid_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "outputType" : "ROW<`supplier_id` VARCHAR(2147483647), `price` DECIMAL(38, 2), `cnt` BIGINT NOT NULL, `window_start` TIMESTAMP(3) NOT NULL, `window_end` TIMESTAMP(3) NOT NULL>", + "description" : "GlobalWindowAggregate(groupBy=[supplier_id], window=[TUMBLE(slice_end=[$slice_end], size=[10 s])], select=[supplier_id, SUM(sum$0) AS price, COUNT(count1$1) AS cnt, start('w$) AS window_start, end('w$) AS window_end])" + }, { + "id" : 17, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "TIMESTAMP(3) NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "TIMESTAMP(3) NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "DECIMAL(38, 2)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "BIGINT NOT NULL" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`window_start` TIMESTAMP(3) NOT NULL, `window_end` TIMESTAMP(3) NOT NULL, `supplier_id` VARCHAR(2147483647), `price` DECIMAL(38, 2), `cnt` BIGINT NOT NULL>", + "description" : "Calc(select=[window_start, window_end, supplier_id, price, cnt])" + }, { + "id" : 18, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "SINGLETON" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`window_start` TIMESTAMP(3) NOT NULL, `window_end` TIMESTAMP(3) NOT NULL, `supplier_id` VARCHAR(2147483647), `price` DECIMAL(38, 2), `cnt` BIGINT NOT NULL>", + "description" : "Exchange(distribution=[single])" + }, { + "id" : 19, + "type" : "stream-exec-window-rank_1", + "configuration" : { + "table.local-time-zone" : "default" + }, + "rankType" : "ROW_NUMBER", + "partitionSpec" : { + "fields" : [ ] + }, + "sortSpec" : { + "fields" : [ { + "index" : 3, + "isAscending" : true, + "nullIsLast" : false + } ] + }, + "rankRange" : { + "type" : "Constant", + "start" : 1, + "end" : 3 + }, + "outputRowNumber" : true, + "windowing" : { + "strategy" : "WindowAttached", + "window" : { + "type" : "TumblingWindow", + "size" : "PT10S" + }, + "timeAttributeType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + }, + "windowStart" : 0, + "windowEnd" : 1, + "isRowtime" : true + }, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`window_start` TIMESTAMP(3) NOT NULL, `window_end` TIMESTAMP(3) NOT NULL, `supplier_id` VARCHAR(2147483647), `price` DECIMAL(38, 2), `cnt` BIGINT NOT NULL, `w0$o0` BIGINT NOT NULL>", + "description" : "WindowRank(window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[10 s])], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=3], partitionBy=[], orderBy=[price ASC], select=[window_start, window_end, supplier_id, price, cnt, w0$o0])" + }, { + "id" : 20, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$CAST$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "TIMESTAMP(3) NOT NULL" + } ], + "type" : "TIMESTAMP(3)" + }, { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$CAST$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "TIMESTAMP(3) NOT NULL" + } ], + "type" : "TIMESTAMP(3)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$CAST$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "DECIMAL(38, 2)" + } ], + "type" : "DECIMAL(10, 2)" + }, { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$CAST$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "BIGINT NOT NULL" + } ], + "type" : "BIGINT" + }, { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$CAST$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 5, + "type" : "BIGINT NOT NULL" + } ], + "type" : "BIGINT" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`window_start` TIMESTAMP(3), `window_end` TIMESTAMP(3), `supplier_id` VARCHAR(2147483647), `total_price` DECIMAL(10, 2), `cnt` BIGINT, `row_num` BIGINT>", + "description" : "Calc(select=[CAST(window_start AS TIMESTAMP(3)) AS window_start, CAST(window_end AS TIMESTAMP(3)) AS window_end, supplier_id, CAST(price AS DECIMAL(10, 2)) AS total_price, CAST(cnt AS BIGINT) AS cnt, CAST(w0$o0 AS BIGINT) AS row_num])" + }, { + "id" : 21, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "window_start", + "dataType" : "TIMESTAMP(3)" + }, { + "name" : "window_end", + "dataType" : "TIMESTAMP(3)" + }, { + "name" : "supplier_id", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "total_price", + "dataType" : "DECIMAL(10, 2)" + }, { + "name" : "cnt", + "dataType" : "BIGINT" + }, { + "name" : "row_num", + "dataType" : "BIGINT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT" ], + "inputUpsertKey" : [ 5 ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`window_start` TIMESTAMP(3), `window_end` TIMESTAMP(3), `supplier_id` VARCHAR(2147483647), `total_price` DECIMAL(10, 2), `cnt` BIGINT, `row_num` BIGINT>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[window_start, window_end, supplier_id, total_price, cnt, row_num])" + } ], + "edges" : [ { + "source" : 10, + "target" : 11, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 11, + "target" : 12, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 12, + "target" : 13, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 13, + "target" : 14, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 14, + "target" : 15, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 15, + "target" : 16, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 16, + "target" : 17, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 17, + "target" : 18, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 18, + "target" : 19, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 19, + "target" : 20, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 20, + "target" : 21, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-window-rank_1/window-rank-tumble-tvf-agg-min-top-n/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-window-rank_1/window-rank-tumble-tvf-agg-min-top-n/savepoint/_metadata new file mode 100644 index 00000000000..dc6911bc738 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-window-rank_1/window-rank-tumble-tvf-agg-min-top-n/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-window-rank_1/window-rank-tumble-tvf-max-top-n/plan/window-rank-tumble-tvf-max-top-n.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-window-rank_1/window-rank-tumble-tvf-max-top-n/plan/window-rank-tumble-tvf-max-top-n.json new file mode 100644 index 00000000000..1d7f80b68c4 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-window-rank_1/window-rank-tumble-tvf-max-top-n/plan/window-rank-tumble-tvf-max-top-n.json @@ -0,0 +1,685 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 22, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`bid_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "ts", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "price", + "dataType" : "DECIMAL(10, 2)" + }, { + "name" : "item", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "supplier_id", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "bid_time", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "TO_TIMESTAMP(`ts`)" + } + }, { + "name" : "proc_time", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$PROCTIME$1", + "operands" : [ ], + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, + "serializableString" : "PROCTIME()" + } + } ], + "watermarkSpecs" : [ { + "rowtimeAttribute" : "bid_time", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$-$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "TIMESTAMP(3)" + }, { + "kind" : "LITERAL", + "value" : "1000", + "type" : "INTERVAL SECOND(6) NOT NULL" + } ], + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "`bid_time` - INTERVAL '1' SECOND" + } + } ] + }, + "partitionKeys" : [ ] + } + } + }, + "outputType" : "ROW<`ts` VARCHAR(2147483647), `price` DECIMAL(10, 2), `item` VARCHAR(2147483647), `supplier_id` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, bid_t]], fields=[ts, price, item, supplier_id])", + "inputProperties" : [ ] + }, { + "id" : 23, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "DECIMAL(10, 2)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + }, { + "kind" : "CALL", + "internalName" : "$PROCTIME$1", + "operands" : [ ], + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "ts", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "price", + "fieldType" : "DECIMAL(10, 2)" + }, { + "name" : "item", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "supplier_id", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "bid_time", + "fieldType" : "TIMESTAMP(3)" + }, { + "name" : "proc_time", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + } ] + }, + "description" : "Calc(select=[ts, price, item, supplier_id, TO_TIMESTAMP(ts) AS bid_time, PROCTIME() AS proc_time])" + }, { + "id" : 24, + "type" : "stream-exec-watermark-assigner_1", + "watermarkExpr" : { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$-$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "TIMESTAMP(3)" + }, { + "kind" : "LITERAL", + "value" : "1000", + "type" : "INTERVAL SECOND(6) NOT NULL" + } ], + "type" : "TIMESTAMP(3)" + }, + "rowtimeFieldIndex" : 4, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "ts", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "price", + "fieldType" : "DECIMAL(10, 2)" + }, { + "name" : "item", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "supplier_id", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "bid_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "name" : "proc_time", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + } ] + }, + "description" : "WatermarkAssigner(rowtime=[bid_time], watermark=[(bid_time - 1000:INTERVAL SECOND)])" + }, { + "id" : 25, + "type" : "stream-exec-window-table-function_1", + "configuration" : { + "table.local-time-zone" : "default" + }, + "windowing" : { + "strategy" : "TimeAttribute", + "window" : { + "type" : "TumblingWindow", + "size" : "PT10S" + }, + "timeAttributeType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + }, + "timeAttributeIndex" : 4, + "isRowtime" : true + }, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "ts", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "price", + "fieldType" : "DECIMAL(10, 2)" + }, { + "name" : "item", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "supplier_id", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "bid_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "name" : "proc_time", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, { + "name" : "window_start", + "fieldType" : "TIMESTAMP(3) NOT NULL" + }, { + "name" : "window_end", + "fieldType" : "TIMESTAMP(3) NOT NULL" + }, { + "name" : "window_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "WindowTableFunction(window=[TUMBLE(time_col=[bid_time], size=[10 s])])" + }, { + "id" : 26, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "DECIMAL(10, 2)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 6, + "type" : "TIMESTAMP(3) NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 7, + "type" : "TIMESTAMP(3) NOT NULL" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "price", + "fieldType" : "DECIMAL(10, 2)" + }, { + "name" : "item", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "supplier_id", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "bid_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "name" : "window_start", + "fieldType" : "TIMESTAMP(3) NOT NULL" + }, { + "name" : "window_end", + "fieldType" : "TIMESTAMP(3) NOT NULL" + } ] + }, + "description" : "Calc(select=[price, item, supplier_id, bid_time, window_start, window_end])" + }, { + "id" : 27, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "SINGLETON" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "price", + "fieldType" : "DECIMAL(10, 2)" + }, { + "name" : "item", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "supplier_id", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "bid_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "name" : "window_start", + "fieldType" : "TIMESTAMP(3) NOT NULL" + }, { + "name" : "window_end", + "fieldType" : "TIMESTAMP(3) NOT NULL" + } ] + }, + "description" : "Exchange(distribution=[single])" + }, { + "id" : 28, + "type" : "stream-exec-window-rank_1", + "configuration" : { + "table.local-time-zone" : "default" + }, + "rankType" : "ROW_NUMBER", + "partitionSpec" : { + "fields" : [ ] + }, + "sortSpec" : { + "fields" : [ { + "index" : 0, + "isAscending" : false, + "nullIsLast" : true + } ] + }, + "rankRange" : { + "type" : "Constant", + "start" : 1, + "end" : 3 + }, + "outputRowNumber" : true, + "windowing" : { + "strategy" : "WindowAttached", + "window" : { + "type" : "TumblingWindow", + "size" : "PT10S" + }, + "timeAttributeType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + }, + "windowStart" : 4, + "windowEnd" : 5, + "isRowtime" : true + }, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "price", + "fieldType" : "DECIMAL(10, 2)" + }, { + "name" : "item", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "supplier_id", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "bid_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "name" : "window_start", + "fieldType" : "TIMESTAMP(3) NOT NULL" + }, { + "name" : "window_end", + "fieldType" : "TIMESTAMP(3) NOT NULL" + }, { + "name" : "row_num", + "fieldType" : "BIGINT NOT NULL" + } ] + }, + "description" : "WindowRank(window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[10 s])], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=3], partitionBy=[], orderBy=[price DESC], select=[price, item, supplier_id, bid_time, window_start, window_end, row_num])" + }, { + "id" : 29, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "TIMESTAMP(3) NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 5, + "type" : "TIMESTAMP(3) NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "DECIMAL(10, 2)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 6, + "type" : "BIGINT NOT NULL" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "window_start", + "fieldType" : "TIMESTAMP(3) NOT NULL" + }, { + "name" : "window_end", + "fieldType" : "TIMESTAMP(3) NOT NULL" + }, { + "name" : "bid_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "name" : "supplier_id", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "price", + "fieldType" : "DECIMAL(10, 2)" + }, { + "name" : "item", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "row_num", + "fieldType" : "BIGINT NOT NULL" + } ] + }, + "description" : "Calc(select=[window_start, window_end, bid_time, supplier_id, price, item, row_num])" + }, { + "id" : 30, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "window_start", + "dataType" : "TIMESTAMP(3)" + }, { + "name" : "window_end", + "dataType" : "TIMESTAMP(3)" + }, { + "name" : "bid_time", + "dataType" : "TIMESTAMP(3)" + }, { + "name" : "supplier_id", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "price", + "dataType" : "DECIMAL(10, 2)" + }, { + "name" : "item", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "row_num", + "dataType" : "BIGINT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT" ], + "inputUpsertKey" : [ 6 ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "window_start", + "fieldType" : "TIMESTAMP(3) NOT NULL" + }, { + "name" : "window_end", + "fieldType" : "TIMESTAMP(3) NOT NULL" + }, { + "name" : "bid_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "name" : "supplier_id", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "price", + "fieldType" : "DECIMAL(10, 2)" + }, { + "name" : "item", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "row_num", + "fieldType" : "BIGINT NOT NULL" + } ] + }, + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[window_start, window_end, bid_time, supplier_id, price, item, row_num])" + } ], + "edges" : [ { + "source" : 22, + "target" : 23, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 23, + "target" : 24, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 24, + "target" : 25, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 25, + "target" : 26, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 26, + "target" : 27, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 27, + "target" : 28, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 28, + "target" : 29, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 29, + "target" : 30, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-window-rank_1/window-rank-tumble-tvf-max-top-n/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-window-rank_1/window-rank-tumble-tvf-max-top-n/savepoint/_metadata new file mode 100644 index 00000000000..1427b6651cc Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-window-rank_1/window-rank-tumble-tvf-max-top-n/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-window-rank_1/window-rank-tumble-tvf-min-top-n/plan/window-rank-tumble-tvf-min-top-n.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-window-rank_1/window-rank-tumble-tvf-min-top-n/plan/window-rank-tumble-tvf-min-top-n.json new file mode 100644 index 00000000000..b654217269b --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-window-rank_1/window-rank-tumble-tvf-min-top-n/plan/window-rank-tumble-tvf-min-top-n.json @@ -0,0 +1,685 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 1, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`bid_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "ts", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "price", + "dataType" : "DECIMAL(10, 2)" + }, { + "name" : "item", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "supplier_id", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "bid_time", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "TO_TIMESTAMP(`ts`)" + } + }, { + "name" : "proc_time", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$PROCTIME$1", + "operands" : [ ], + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, + "serializableString" : "PROCTIME()" + } + } ], + "watermarkSpecs" : [ { + "rowtimeAttribute" : "bid_time", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$-$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "TIMESTAMP(3)" + }, { + "kind" : "LITERAL", + "value" : "1000", + "type" : "INTERVAL SECOND(6) NOT NULL" + } ], + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "`bid_time` - INTERVAL '1' SECOND" + } + } ] + }, + "partitionKeys" : [ ] + } + } + }, + "outputType" : "ROW<`ts` VARCHAR(2147483647), `price` DECIMAL(10, 2), `item` VARCHAR(2147483647), `supplier_id` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, bid_t]], fields=[ts, price, item, supplier_id])", + "inputProperties" : [ ] + }, { + "id" : 2, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "DECIMAL(10, 2)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + }, { + "kind" : "CALL", + "internalName" : "$PROCTIME$1", + "operands" : [ ], + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "ts", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "price", + "fieldType" : "DECIMAL(10, 2)" + }, { + "name" : "item", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "supplier_id", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "bid_time", + "fieldType" : "TIMESTAMP(3)" + }, { + "name" : "proc_time", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + } ] + }, + "description" : "Calc(select=[ts, price, item, supplier_id, TO_TIMESTAMP(ts) AS bid_time, PROCTIME() AS proc_time])" + }, { + "id" : 3, + "type" : "stream-exec-watermark-assigner_1", + "watermarkExpr" : { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$-$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "TIMESTAMP(3)" + }, { + "kind" : "LITERAL", + "value" : "1000", + "type" : "INTERVAL SECOND(6) NOT NULL" + } ], + "type" : "TIMESTAMP(3)" + }, + "rowtimeFieldIndex" : 4, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "ts", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "price", + "fieldType" : "DECIMAL(10, 2)" + }, { + "name" : "item", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "supplier_id", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "bid_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "name" : "proc_time", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + } ] + }, + "description" : "WatermarkAssigner(rowtime=[bid_time], watermark=[(bid_time - 1000:INTERVAL SECOND)])" + }, { + "id" : 4, + "type" : "stream-exec-window-table-function_1", + "configuration" : { + "table.local-time-zone" : "default" + }, + "windowing" : { + "strategy" : "TimeAttribute", + "window" : { + "type" : "TumblingWindow", + "size" : "PT10S" + }, + "timeAttributeType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + }, + "timeAttributeIndex" : 4, + "isRowtime" : true + }, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "ts", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "price", + "fieldType" : "DECIMAL(10, 2)" + }, { + "name" : "item", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "supplier_id", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "bid_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "name" : "proc_time", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, { + "name" : "window_start", + "fieldType" : "TIMESTAMP(3) NOT NULL" + }, { + "name" : "window_end", + "fieldType" : "TIMESTAMP(3) NOT NULL" + }, { + "name" : "window_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "WindowTableFunction(window=[TUMBLE(time_col=[bid_time], size=[10 s])])" + }, { + "id" : 5, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "DECIMAL(10, 2)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 6, + "type" : "TIMESTAMP(3) NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 7, + "type" : "TIMESTAMP(3) NOT NULL" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "price", + "fieldType" : "DECIMAL(10, 2)" + }, { + "name" : "item", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "supplier_id", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "bid_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "name" : "window_start", + "fieldType" : "TIMESTAMP(3) NOT NULL" + }, { + "name" : "window_end", + "fieldType" : "TIMESTAMP(3) NOT NULL" + } ] + }, + "description" : "Calc(select=[price, item, supplier_id, bid_time, window_start, window_end])" + }, { + "id" : 6, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "SINGLETON" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "price", + "fieldType" : "DECIMAL(10, 2)" + }, { + "name" : "item", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "supplier_id", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "bid_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "name" : "window_start", + "fieldType" : "TIMESTAMP(3) NOT NULL" + }, { + "name" : "window_end", + "fieldType" : "TIMESTAMP(3) NOT NULL" + } ] + }, + "description" : "Exchange(distribution=[single])" + }, { + "id" : 7, + "type" : "stream-exec-window-rank_1", + "configuration" : { + "table.local-time-zone" : "default" + }, + "rankType" : "ROW_NUMBER", + "partitionSpec" : { + "fields" : [ ] + }, + "sortSpec" : { + "fields" : [ { + "index" : 0, + "isAscending" : true, + "nullIsLast" : false + } ] + }, + "rankRange" : { + "type" : "Constant", + "start" : 1, + "end" : 3 + }, + "outputRowNumber" : true, + "windowing" : { + "strategy" : "WindowAttached", + "window" : { + "type" : "TumblingWindow", + "size" : "PT10S" + }, + "timeAttributeType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + }, + "windowStart" : 4, + "windowEnd" : 5, + "isRowtime" : true + }, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "price", + "fieldType" : "DECIMAL(10, 2)" + }, { + "name" : "item", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "supplier_id", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "bid_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "name" : "window_start", + "fieldType" : "TIMESTAMP(3) NOT NULL" + }, { + "name" : "window_end", + "fieldType" : "TIMESTAMP(3) NOT NULL" + }, { + "name" : "row_num", + "fieldType" : "BIGINT NOT NULL" + } ] + }, + "description" : "WindowRank(window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[10 s])], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=3], partitionBy=[], orderBy=[price ASC], select=[price, item, supplier_id, bid_time, window_start, window_end, row_num])" + }, { + "id" : 8, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "TIMESTAMP(3) NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 5, + "type" : "TIMESTAMP(3) NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "DECIMAL(10, 2)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 6, + "type" : "BIGINT NOT NULL" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "window_start", + "fieldType" : "TIMESTAMP(3) NOT NULL" + }, { + "name" : "window_end", + "fieldType" : "TIMESTAMP(3) NOT NULL" + }, { + "name" : "bid_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "name" : "supplier_id", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "price", + "fieldType" : "DECIMAL(10, 2)" + }, { + "name" : "item", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "row_num", + "fieldType" : "BIGINT NOT NULL" + } ] + }, + "description" : "Calc(select=[window_start, window_end, bid_time, supplier_id, price, item, row_num])" + }, { + "id" : 9, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "window_start", + "dataType" : "TIMESTAMP(3)" + }, { + "name" : "window_end", + "dataType" : "TIMESTAMP(3)" + }, { + "name" : "bid_time", + "dataType" : "TIMESTAMP(3)" + }, { + "name" : "supplier_id", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "price", + "dataType" : "DECIMAL(10, 2)" + }, { + "name" : "item", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "row_num", + "dataType" : "BIGINT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT" ], + "inputUpsertKey" : [ 6 ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "window_start", + "fieldType" : "TIMESTAMP(3) NOT NULL" + }, { + "name" : "window_end", + "fieldType" : "TIMESTAMP(3) NOT NULL" + }, { + "name" : "bid_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "name" : "supplier_id", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "price", + "fieldType" : "DECIMAL(10, 2)" + }, { + "name" : "item", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "row_num", + "fieldType" : "BIGINT NOT NULL" + } ] + }, + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[window_start, window_end, bid_time, supplier_id, price, item, row_num])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 3, + "target" : 4, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 4, + "target" : 5, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 5, + "target" : 6, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 6, + "target" : 7, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 7, + "target" : 8, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 8, + "target" : 9, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-window-rank_1/window-rank-tumble-tvf-min-top-n/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-window-rank_1/window-rank-tumble-tvf-min-top-n/savepoint/_metadata new file mode 100644 index 00000000000..74e94010491 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-window-rank_1/window-rank-tumble-tvf-min-top-n/savepoint/_metadata differ