This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 34ec781ac7547d376d09854983d169a3aca5130f Author: bvarghese1 <bvargh...@confluent.io> AuthorDate: Wed Nov 15 21:04:39 2023 -0800 [FLINK-33564] Implement restore tests for GroupWindowAggregate node --- .../flink/table/test/program/TableTestStep.java | 5 + .../GroupWindowAggregateEventTimeRestoreTest.java | 41 ++ .../GroupWindowAggregateProcTimeRestoreTest.java | 42 ++ .../stream/GroupWindowAggregateTestPrograms.java | 248 ++++++++++ .../plan/nodes/exec/testutils/RestoreTestBase.java | 93 +++- .../group-window-aggregate-hop-event-time.json | 328 +++++++++++++ .../savepoint/_metadata | Bin 0 -> 12035 bytes .../plan/group-window-aggregate-hop-proc-time.json | 414 +++++++++++++++++ .../savepoint/_metadata | Bin 0 -> 11660 bytes .../group-window-aggregate-session-event-time.json | 326 +++++++++++++ .../savepoint/_metadata | Bin 0 -> 12340 bytes .../group-window-aggregate-session-proc-time.json | 412 +++++++++++++++++ .../savepoint/_metadata | Bin 0 -> 11941 bytes .../group-window-aggregate-tumble-event-time.json | 511 +++++++++++++++++++++ .../savepoint/_metadata | Bin 0 -> 22303 bytes .../group-window-aggregate-tumble-proc-time.json | 478 +++++++++++++++++++ .../savepoint/_metadata | Bin 0 -> 18217 bytes 17 files changed, 2874 insertions(+), 24 deletions(-) diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestStep.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestStep.java index 1d0207f7126..d1ff6b8d018 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestStep.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestStep.java @@ -99,6 +99,11 @@ public abstract class TableTestStep implements TestStep { return (SpecificBuilder) this; } + public SpecificBuilder addSchema(List<String> schemaComponents) { + this.schemaComponents.addAll(schemaComponents); + return (SpecificBuilder) this; + } + /** * Unless the test requires a very specific configuration, try to avoid calling this method * and fill in options later via {@link TableTestStep#apply(TableEnvironment, Map)}. diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateEventTimeRestoreTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateEventTimeRestoreTest.java new file mode 100644 index 00000000000..b583416f530 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateEventTimeRestoreTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase; +import org.apache.flink.table.test.program.TableTestProgram; + +import java.util.Arrays; +import java.util.List; + +/** Restore tests for {@link StreamExecGroupWindowAggregate}. */ +public class GroupWindowAggregateEventTimeRestoreTest extends RestoreTestBase { + + public GroupWindowAggregateEventTimeRestoreTest() { + super(StreamExecGroupWindowAggregate.class); + } + + @Override + public List<TableTestProgram> programs() { + return Arrays.asList( + GroupWindowAggregateTestPrograms.GROUP_TUMBLE_WINDOW_EVENT_TIME, + GroupWindowAggregateTestPrograms.GROUP_HOP_WINDOW_EVENT_TIME, + GroupWindowAggregateTestPrograms.GROUP_SESSION_WINDOW_EVENT_TIME); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateProcTimeRestoreTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateProcTimeRestoreTest.java new file mode 100644 index 00000000000..f33fa80003c --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateProcTimeRestoreTest.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase; +import org.apache.flink.table.test.program.TableTestProgram; + +import java.util.Arrays; +import java.util.List; + +/** Restore tests for {@link StreamExecGroupWindowAggregate}. */ +public class GroupWindowAggregateProcTimeRestoreTest extends RestoreTestBase { + + public GroupWindowAggregateProcTimeRestoreTest() { + super(StreamExecGroupWindowAggregate.class, AfterRestoreSource.INFINITE); + } + + @Override + public List<TableTestProgram> programs() { + + return Arrays.asList( + GroupWindowAggregateTestPrograms.GROUP_TUMBLE_WINDOW_PROC_TIME, + GroupWindowAggregateTestPrograms.GROUP_HOP_WINDOW_PROC_TIME, + GroupWindowAggregateTestPrograms.GROUP_SESSION_WINDOW_PROC_TIME); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateTestPrograms.java new file mode 100644 index 00000000000..03f91f47efe --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateTestPrograms.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.types.Row; + +import java.math.BigDecimal; + +/** {@link TableTestProgram} definitions for testing {@link StreamExecGroupWindowAggregate}. */ +public class GroupWindowAggregateTestPrograms { + + static final Row[] BEFORE_DATA = { + Row.of("2020-10-10 00:00:01", 1, 1d, 1f, new BigDecimal("1.11"), "Hi", "a"), + Row.of("2020-10-10 00:00:02", 2, 2d, 2f, new BigDecimal("2.22"), "Comment#1", "a"), + Row.of("2020-10-10 00:00:03", 2, 2d, 2f, new BigDecimal("2.22"), "Comment#1", "a"), + Row.of("2020-10-10 00:00:04", 5, 5d, 5f, new BigDecimal("5.55"), null, "a"), + Row.of("2020-10-10 00:00:07", 3, 3d, 3f, null, "Hello", "b"), + // out of order + Row.of("2020-10-10 00:00:06", 6, 6d, 6f, new BigDecimal("6.66"), "Hi", "b"), + Row.of("2020-10-10 00:00:08", 3, null, 3f, new BigDecimal("3.33"), "Comment#2", "a"), + // late event + Row.of("2020-10-10 00:00:04", 5, 5d, null, new BigDecimal("5.55"), "Hi", "a"), + Row.of("2020-10-10 00:00:16", 4, 4d, 4f, new BigDecimal("4.44"), "Hi", "b"), + Row.of("2020-10-10 00:00:32", 7, 7d, 7f, new BigDecimal("7.77"), null, null), + Row.of("2020-10-10 00:00:34", 1, 3d, 3f, new BigDecimal("3.33"), "Comment#3", "b") + }; + + static final Row[] AFTER_DATA = { + Row.of("2020-10-10 00:00:41", 10, 3d, 3f, new BigDecimal("4.44"), "Comment#4", "a"), + Row.of("2020-10-10 00:00:42", 11, 4d, 4f, new BigDecimal("5.44"), "Comment#5", "d"), + Row.of("2020-10-10 00:00:43", 12, 5d, 5f, new BigDecimal("6.44"), "Comment#6", "c"), + Row.of("2020-10-10 00:00:44", 13, 6d, 6f, new BigDecimal("7.44"), "Comment#7", "d") + }; + + static final SourceTestStep SOURCE = + SourceTestStep.newBuilder("source_t") + .addSchema( + "ts STRING", + "a_int INT", + "b_double DOUBLE", + "c_float FLOAT", + "d_bigdec DECIMAL(10, 2)", + "`comment` STRING", + "name STRING", + "`rowtime` AS TO_TIMESTAMP(`ts`)", + "`proctime` AS PROCTIME()", + "WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND") + .producedBeforeRestore(BEFORE_DATA) + .producedAfterRestore(AFTER_DATA) + .build(); + + static final TableTestProgram GROUP_TUMBLE_WINDOW_EVENT_TIME = + TableTestProgram.of( + "group-window-aggregate-tumble-event-time", + "validates group by using tumbling window with event time") + .setupTableSource(SOURCE) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema( + "name STRING", + "window_start TIMESTAMP(3)", + "window_end TIMESTAMP(3)", + "cnt BIGINT", + "sum_int INT", + "distinct_cnt BIGINT") + .consumedBeforeRestore( + "+I[a, 2020-10-10T00:00, 2020-10-10T00:00:05, 4, 10, 2]", + "+I[a, 2020-10-10T00:00:05, 2020-10-10T00:00:10, 1, 3, 1]", + "+I[b, 2020-10-10T00:00:05, 2020-10-10T00:00:10, 2, 9, 2]", + "+I[b, 2020-10-10T00:00:15, 2020-10-10T00:00:20, 1, 4, 1]") + .consumedAfterRestore( + "+I[b, 2020-10-10T00:00:30, 2020-10-10T00:00:35, 1, 1, 1]", + "+I[null, 2020-10-10T00:00:30, 2020-10-10T00:00:35, 1, 7, 0]", + "+I[a, 2020-10-10T00:00:40, 2020-10-10T00:00:45, 1, 10, 1]", + "+I[c, 2020-10-10T00:00:40, 2020-10-10T00:00:45, 1, 12, 1]", + "+I[d, 2020-10-10T00:00:40, 2020-10-10T00:00:45, 2, 24, 2]") + .build()) + .runSql( + "INSERT INTO sink_t SELECT " + + "name, " + + "TUMBLE_START(rowtime, INTERVAL '5' SECOND) AS window_start, " + + "TUMBLE_END(rowtime, INTERVAL '5' SECOND) AS window_end, " + + "COUNT(*), " + + "SUM(a_int), " + + "COUNT(DISTINCT `comment`) " + + "FROM source_t " + + "GROUP BY name, TUMBLE(rowtime, INTERVAL '5' SECOND)") + .build(); + + static final TableTestProgram GROUP_TUMBLE_WINDOW_PROC_TIME = + TableTestProgram.of( + "group-window-aggregate-tumble-proc-time", + "validates group by using tumbling window with processing time") + .setupTableSource(SOURCE) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema( + "name STRING", + "cnt BIGINT", + "sum_int INT", + "distinct_cnt BIGINT") + .consumedBeforeRestore( + "+I[a, 6, 18, 3]", + "+I[null, 1, 7, 0]", + "+I[b, 4, 14, 3]") + .consumedAfterRestore( + "+I[a, 1, 10, 1]", "+I[c, 1, 12, 1]", "+I[d, 2, 24, 2]") + .build()) + .runSql( + "INSERT INTO sink_t SELECT " + + "name, " + + "COUNT(*), " + + "SUM(a_int), " + + "COUNT(DISTINCT `comment`) " + + "FROM source_t " + + "GROUP BY name, TUMBLE(proctime, INTERVAL '5' SECOND)") + .build(); + + static final TableTestProgram GROUP_HOP_WINDOW_EVENT_TIME = + TableTestProgram.of( + "group-window-aggregate-hop-event-time", + "validates group by using hopping window with event time") + .setupTableSource(SOURCE) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema("name STRING", "cnt BIGINT") + .consumedBeforeRestore( + "+I[a, 4]", + "+I[b, 2]", + "+I[a, 6]", + "+I[a, 1]", + "+I[b, 2]", + "+I[b, 1]", + "+I[b, 1]") + .consumedAfterRestore( + "+I[b, 1]", + "+I[null, 1]", + "+I[b, 1]", + "+I[null, 1]", + "+I[a, 1]", + "+I[d, 2]", + "+I[c, 1]", + "+I[a, 1]", + "+I[c, 1]", + "+I[d, 2]") + .build()) + .runSql( + "INSERT INTO sink_t SELECT " + + "name, " + + "COUNT(*) " + + "FROM source_t " + + "GROUP BY name, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '10' SECOND)") + .build(); + + static final TableTestProgram GROUP_HOP_WINDOW_PROC_TIME = + TableTestProgram.of( + "group-window-aggregate-hop-proc-time", + "validates group by using hopping window with processing time") + .setupTableSource(SOURCE) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema("name STRING", "cnt BIGINT") + .consumedBeforeRestore( + "+I[a, 6]", + "+I[b, 4]", + "+I[null, 1]", + "+I[a, 6]", + "+I[null, 1]", + "+I[b, 4]") + .consumedAfterRestore( + "+I[a, 1]", + "+I[d, 2]", + "+I[c, 1]", + "+I[a, 1]", + "+I[c, 1]", + "+I[d, 2]") + .build()) + .runSql( + "INSERT INTO sink_t SELECT " + + "name, " + + "COUNT(*) " + + "FROM source_t " + + "GROUP BY name, HOP(proctime, INTERVAL '5' SECOND, INTERVAL '10' SECOND)") + .build(); + + static final TableTestProgram GROUP_SESSION_WINDOW_EVENT_TIME = + TableTestProgram.of( + "group-window-aggregate-session-event-time", + "validates group by using session window with event time") + .setupTableSource(SOURCE) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema("name STRING", "cnt BIGINT") + .consumedBeforeRestore( + "+I[a, 4]", "+I[b, 2]", "+I[a, 1]", "+I[b, 1]") + .consumedAfterRestore( + "+I[null, 1]", + "+I[b, 1]", + "+I[a, 1]", + "+I[c, 1]", + "+I[d, 2]") + .build()) + .runSql( + "INSERT INTO sink_t SELECT " + + "name, " + + "COUNT(*) " + + "FROM source_t " + + "GROUP BY name, SESSION(rowtime, INTERVAL '3' SECOND)") + .build(); + + static final TableTestProgram GROUP_SESSION_WINDOW_PROC_TIME = + TableTestProgram.of( + "group-window-aggregate-session-proc-time", + "validates group by using session window with processing time") + .setupTableSource(SOURCE) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema("name STRING", "cnt BIGINT") + .consumedBeforeRestore("+I[a, 6]", "+I[null, 1]", "+I[b, 4]") + .consumedAfterRestore("+I[a, 1]", "+I[c, 1]", "+I[d, 2]") + .build()) + .runSql( + "INSERT INTO sink_t SELECT " + + "name, " + + "COUNT(*) " + + "FROM source_t " + + "GROUP BY name, SESSION(proctime, INTERVAL '3' SECOND)") + .build(); +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java index 59a72d28fff..aed76d7a3c2 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java @@ -81,9 +81,26 @@ import static org.assertj.core.api.Assertions.assertThat; public abstract class RestoreTestBase implements TableTestProgramRunner { private final Class<? extends ExecNode> execNodeUnderTest; + private final AfterRestoreSource afterRestoreSource; protected RestoreTestBase(Class<? extends ExecNode> execNodeUnderTest) { this.execNodeUnderTest = execNodeUnderTest; + this.afterRestoreSource = AfterRestoreSource.FINITE; + } + + protected RestoreTestBase( + Class<? extends ExecNode> execNodeUnderTest, AfterRestoreSource state) { + this.execNodeUnderTest = execNodeUnderTest; + this.afterRestoreSource = state; + } + + /** + * AfterRestoreSource defines the source behavior while running {@link + * RestoreTestBase#testRestore}. + */ + protected enum AfterRestoreSource { + FINITE, + INFINITE } @Override @@ -117,6 +134,31 @@ public abstract class RestoreTestBase implements TableTestProgramRunner { supportedPrograms().stream().map(p -> Arguments.of(p, metadata))); } + private void registerSinkObserver( + final List<CompletableFuture<?>> futures, + final SinkTestStep sinkTestStep, + final boolean ignoreAfter) { + final CompletableFuture<Object> future = new CompletableFuture<>(); + futures.add(future); + final String tableName = sinkTestStep.name; + TestValuesTableFactory.registerLocalRawResultsObserver( + tableName, + (integer, strings) -> { + List<String> results = new ArrayList<>(); + results.addAll(sinkTestStep.getExpectedBeforeRestoreAsStrings()); + if (!ignoreAfter) { + results.addAll(sinkTestStep.getExpectedAfterRestoreAsStrings()); + } + final boolean shouldComplete = + CollectionUtils.isEqualCollection( + TestValuesTableFactory.getRawResultsAsStrings(tableName), + results); + if (shouldComplete) { + future.complete(null); + } + }); + } + /** * Execute this test to generate test files. Remember to be using the correct branch when * generating the test files. @@ -145,20 +187,7 @@ public abstract class RestoreTestBase implements TableTestProgramRunner { final List<CompletableFuture<?>> futures = new ArrayList<>(); for (SinkTestStep sinkTestStep : program.getSetupSinkTestSteps()) { - final CompletableFuture<Object> future = new CompletableFuture<>(); - futures.add(future); - final String tableName = sinkTestStep.name; - TestValuesTableFactory.registerLocalRawResultsObserver( - tableName, - (integer, strings) -> { - final boolean shouldTakeSavepoint = - CollectionUtils.isEqualCollection( - TestValuesTableFactory.getRawResultsAsStrings(tableName), - sinkTestStep.getExpectedBeforeRestoreAsStrings()); - if (shouldTakeSavepoint) { - future.complete(null); - } - }); + registerSinkObserver(futures, sinkTestStep, true); final Map<String, String> options = new HashMap<>(); options.put("connector", "values"); options.put("disable-lookup", "true"); @@ -202,6 +231,7 @@ public abstract class RestoreTestBase implements TableTestProgramRunner { .set( TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS, TableConfigOptions.CatalogPlanRestore.IDENTIFIER); + for (SourceTestStep sourceTestStep : program.getSetupSourceTestSteps()) { final String id = TestValuesTableFactory.registerData(sourceTestStep.dataAfterRestore); final Map<String, String> options = new HashMap<>(); @@ -209,10 +239,18 @@ public abstract class RestoreTestBase implements TableTestProgramRunner { options.put("data-id", id); options.put("disable-lookup", "true"); options.put("runtime-source", "NewSource"); + if (afterRestoreSource == AfterRestoreSource.INFINITE) { + options.put("terminating", "false"); + } sourceTestStep.apply(tEnv, options); } + final List<CompletableFuture<?>> futures = new ArrayList<>(); + for (SinkTestStep sinkTestStep : program.getSetupSinkTestSteps()) { + if (afterRestoreSource == AfterRestoreSource.INFINITE) { + registerSinkObserver(futures, sinkTestStep, false); + } final Map<String, String> options = new HashMap<>(); options.put("connector", "values"); options.put("disable-lookup", "true"); @@ -224,16 +262,23 @@ public abstract class RestoreTestBase implements TableTestProgramRunner { final CompiledPlan compiledPlan = tEnv.loadPlan(PlanReference.fromFile(getPlanPath(program, metadata))); - compiledPlan.execute().await(); - for (SinkTestStep sinkTestStep : program.getSetupSinkTestSteps()) { - assertThat(TestValuesTableFactory.getRawResultsAsStrings(sinkTestStep.name)) - .containsExactlyInAnyOrder( - Stream.concat( - sinkTestStep.getExpectedBeforeRestoreAsStrings() - .stream(), - sinkTestStep.getExpectedAfterRestoreAsStrings() - .stream()) - .toArray(String[]::new)); + + if (afterRestoreSource == AfterRestoreSource.INFINITE) { + final TableResult tableResult = compiledPlan.execute(); + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); + tableResult.getJobClient().get().cancel().get(); + } else { + compiledPlan.execute().await(); + for (SinkTestStep sinkTestStep : program.getSetupSinkTestSteps()) { + assertThat(TestValuesTableFactory.getRawResultsAsStrings(sinkTestStep.name)) + .containsExactlyInAnyOrder( + Stream.concat( + sinkTestStep.getExpectedBeforeRestoreAsStrings() + .stream(), + sinkTestStep.getExpectedAfterRestoreAsStrings() + .stream()) + .toArray(String[]::new)); + } } } diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-hop-event-time/plan/group-window-aggregate-hop-event-time.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-hop-event-time/plan/group-window-aggregate-hop-event-time.json new file mode 100644 index 00000000000..626e43edc9b --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-hop-event-time/plan/group-window-aggregate-hop-event-time.json @@ -0,0 +1,328 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 8, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "ts", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "a_int", + "dataType" : "INT" + }, { + "name" : "b_double", + "dataType" : "DOUBLE" + }, { + "name" : "c_float", + "dataType" : "FLOAT" + }, { + "name" : "d_bigdec", + "dataType" : "DECIMAL(10, 2)" + }, { + "name" : "comment", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "name", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "rowtime", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "TO_TIMESTAMP(`ts`)" + } + }, { + "name" : "proctime", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$PROCTIME$1", + "operands" : [ ], + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, + "serializableString" : "PROCTIME()" + } + } ], + "watermarkSpecs" : [ { + "rowtimeAttribute" : "rowtime", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$-$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 7, + "type" : "TIMESTAMP(3)" + }, { + "kind" : "LITERAL", + "value" : "1000", + "type" : "INTERVAL SECOND(6) NOT NULL" + } ], + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "`rowtime` - INTERVAL '1' SECOND" + } + } ] + }, + "partitionKeys" : [ ] + } + }, + "abilities" : [ { + "type" : "ProjectPushDown", + "projectedFields" : [ [ 6 ], [ 0 ] ], + "producedType" : "ROW<`name` VARCHAR(2147483647), `ts` VARCHAR(2147483647)> NOT NULL" + }, { + "type" : "ReadingMetadata", + "metadataKeys" : [ ], + "producedType" : "ROW<`name` VARCHAR(2147483647), `ts` VARCHAR(2147483647)> NOT NULL" + } ] + }, + "outputType" : "ROW<`name` VARCHAR(2147483647), `ts` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t, project=[name, ts], metadata=[]]], fields=[name, ts])", + "inputProperties" : [ ] + }, { + "id" : 9, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`name` VARCHAR(2147483647), `rowtime` TIMESTAMP(3)>", + "description" : "Calc(select=[name, TO_TIMESTAMP(ts) AS rowtime])" + }, { + "id" : 10, + "type" : "stream-exec-watermark-assigner_1", + "watermarkExpr" : { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$-$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "TIMESTAMP(3)" + }, { + "kind" : "LITERAL", + "value" : "1000", + "type" : "INTERVAL SECOND(6) NOT NULL" + } ], + "type" : "TIMESTAMP(3)" + }, + "rowtimeFieldIndex" : 1, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "name", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "rowtime", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)])" + }, { + "id" : 11, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "name", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "rowtime", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "Exchange(distribution=[hash[name]])" + }, { + "id" : 12, + "type" : "stream-exec-group-window-aggregate_1", + "configuration" : { + "table.exec.mini-batch.enabled" : "false", + "table.exec.mini-batch.size" : "-1", + "table.local-time-zone" : "default" + }, + "grouping" : [ 0 ], + "aggCalls" : [ { + "name" : "EXPR$1", + "syntax" : "FUNCTION_STAR", + "internalName" : "$COUNT$1", + "argList" : [ ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT NOT NULL" + } ], + "window" : { + "kind" : "SLIDING", + "alias" : { + "name" : "w$", + "type" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, + "timeField" : { + "fieldName" : "rowtime", + "fieldIndex" : 1, + "inputIndex" : 0, + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, + "isTimeWindow" : true, + "size" : "PT10S", + "slide" : "PT5S" + }, + "namedWindowProperties" : [ ], + "needRetraction" : false, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`name` VARCHAR(2147483647), `EXPR$1` BIGINT NOT NULL>", + "description" : "GroupWindowAggregate(groupBy=[name], window=[SlidingGroupWindow('w$, rowtime, 10000, 5000)], select=[name, COUNT(*) AS EXPR$1])" + }, { + "id" : 13, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "name", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "cnt", + "dataType" : "BIGINT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`name` VARCHAR(2147483647), `EXPR$1` BIGINT NOT NULL>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[name, EXPR$1])" + } ], + "edges" : [ { + "source" : 8, + "target" : 9, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 9, + "target" : 10, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 10, + "target" : 11, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 11, + "target" : 12, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 12, + "target" : 13, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-hop-event-time/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-hop-event-time/savepoint/_metadata new file mode 100644 index 00000000000..2272a348f91 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-hop-event-time/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-hop-proc-time/plan/group-window-aggregate-hop-proc-time.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-hop-proc-time/plan/group-window-aggregate-hop-proc-time.json new file mode 100644 index 00000000000..a3db2384730 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-hop-proc-time/plan/group-window-aggregate-hop-proc-time.json @@ -0,0 +1,414 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 8, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "ts", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "a_int", + "dataType" : "INT" + }, { + "name" : "b_double", + "dataType" : "DOUBLE" + }, { + "name" : "c_float", + "dataType" : "FLOAT" + }, { + "name" : "d_bigdec", + "dataType" : "DECIMAL(10, 2)" + }, { + "name" : "comment", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "name", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "rowtime", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "TO_TIMESTAMP(`ts`)" + } + }, { + "name" : "proctime", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$PROCTIME$1", + "operands" : [ ], + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, + "serializableString" : "PROCTIME()" + } + } ], + "watermarkSpecs" : [ { + "rowtimeAttribute" : "rowtime", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$-$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 7, + "type" : "TIMESTAMP(3)" + }, { + "kind" : "LITERAL", + "value" : "1000", + "type" : "INTERVAL SECOND(6) NOT NULL" + } ], + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "`rowtime` - INTERVAL '1' SECOND" + } + } ] + }, + "partitionKeys" : [ ] + } + }, + "abilities" : [ { + "type" : "ProjectPushDown", + "projectedFields" : [ [ 6 ], [ 0 ] ], + "producedType" : "ROW<`name` VARCHAR(2147483647), `ts` VARCHAR(2147483647)> NOT NULL" + }, { + "type" : "ReadingMetadata", + "metadataKeys" : [ ], + "producedType" : "ROW<`name` VARCHAR(2147483647), `ts` VARCHAR(2147483647)> NOT NULL" + } ] + }, + "outputType" : "ROW<`name` VARCHAR(2147483647), `ts` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t, project=[name, ts], metadata=[]]], fields=[name, ts])", + "inputProperties" : [ ] + }, { + "id" : 9, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "CALL", + "internalName" : "$PROCTIME$1", + "operands" : [ ], + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "name", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "proctime", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, { + "name" : "rowtime", + "fieldType" : "TIMESTAMP(3)" + } ] + }, + "description" : "Calc(select=[name, PROCTIME() AS proctime, TO_TIMESTAMP(ts) AS rowtime])" + }, { + "id" : 10, + "type" : "stream-exec-watermark-assigner_1", + "watermarkExpr" : { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$-$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "TIMESTAMP(3)" + }, { + "kind" : "LITERAL", + "value" : "1000", + "type" : "INTERVAL SECOND(6) NOT NULL" + } ], + "type" : "TIMESTAMP(3)" + }, + "rowtimeFieldIndex" : 2, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "name", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "proctime", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, { + "name" : "rowtime", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)])" + }, { + "id" : 11, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "name", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "proctime", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + } ] + }, + "description" : "Calc(select=[name, proctime])" + }, { + "id" : 12, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "name", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "proctime", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + } ] + }, + "description" : "Exchange(distribution=[hash[name]])" + }, { + "id" : 13, + "type" : "stream-exec-group-window-aggregate_1", + "configuration" : { + "table.exec.mini-batch.enabled" : "false", + "table.exec.mini-batch.size" : "-1", + "table.local-time-zone" : "default" + }, + "grouping" : [ 0 ], + "aggCalls" : [ { + "name" : "EXPR$1", + "syntax" : "FUNCTION_STAR", + "internalName" : "$COUNT$1", + "argList" : [ ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT NOT NULL" + } ], + "window" : { + "kind" : "SLIDING", + "alias" : { + "name" : "w$", + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, + "timeField" : { + "fieldName" : "proctime", + "fieldIndex" : 1, + "inputIndex" : 0, + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, + "isTimeWindow" : true, + "size" : "PT10S", + "slide" : "PT5S" + }, + "namedWindowProperties" : [ ], + "needRetraction" : false, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`name` VARCHAR(2147483647), `EXPR$1` BIGINT NOT NULL>", + "description" : "GroupWindowAggregate(groupBy=[name], window=[SlidingGroupWindow('w$, proctime, 10000, 5000)], select=[name, COUNT(*) AS EXPR$1])" + }, { + "id" : 14, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "name", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "cnt", + "dataType" : "BIGINT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`name` VARCHAR(2147483647), `EXPR$1` BIGINT NOT NULL>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[name, EXPR$1])" + } ], + "edges" : [ { + "source" : 8, + "target" : 9, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 9, + "target" : 10, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 10, + "target" : 11, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 11, + "target" : 12, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 12, + "target" : 13, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 13, + "target" : 14, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-hop-proc-time/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-hop-proc-time/savepoint/_metadata new file mode 100644 index 00000000000..728ac0ac490 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-hop-proc-time/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-session-event-time/plan/group-window-aggregate-session-event-time.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-session-event-time/plan/group-window-aggregate-session-event-time.json new file mode 100644 index 00000000000..71cc9b36384 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-session-event-time/plan/group-window-aggregate-session-event-time.json @@ -0,0 +1,326 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 14, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "ts", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "a_int", + "dataType" : "INT" + }, { + "name" : "b_double", + "dataType" : "DOUBLE" + }, { + "name" : "c_float", + "dataType" : "FLOAT" + }, { + "name" : "d_bigdec", + "dataType" : "DECIMAL(10, 2)" + }, { + "name" : "comment", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "name", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "rowtime", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "TO_TIMESTAMP(`ts`)" + } + }, { + "name" : "proctime", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$PROCTIME$1", + "operands" : [ ], + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, + "serializableString" : "PROCTIME()" + } + } ], + "watermarkSpecs" : [ { + "rowtimeAttribute" : "rowtime", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$-$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 7, + "type" : "TIMESTAMP(3)" + }, { + "kind" : "LITERAL", + "value" : "1000", + "type" : "INTERVAL SECOND(6) NOT NULL" + } ], + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "`rowtime` - INTERVAL '1' SECOND" + } + } ] + }, + "partitionKeys" : [ ] + } + }, + "abilities" : [ { + "type" : "ProjectPushDown", + "projectedFields" : [ [ 6 ], [ 0 ] ], + "producedType" : "ROW<`name` VARCHAR(2147483647), `ts` VARCHAR(2147483647)> NOT NULL" + }, { + "type" : "ReadingMetadata", + "metadataKeys" : [ ], + "producedType" : "ROW<`name` VARCHAR(2147483647), `ts` VARCHAR(2147483647)> NOT NULL" + } ] + }, + "outputType" : "ROW<`name` VARCHAR(2147483647), `ts` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t, project=[name, ts], metadata=[]]], fields=[name, ts])", + "inputProperties" : [ ] + }, { + "id" : 15, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`name` VARCHAR(2147483647), `rowtime` TIMESTAMP(3)>", + "description" : "Calc(select=[name, TO_TIMESTAMP(ts) AS rowtime])" + }, { + "id" : 16, + "type" : "stream-exec-watermark-assigner_1", + "watermarkExpr" : { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$-$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "TIMESTAMP(3)" + }, { + "kind" : "LITERAL", + "value" : "1000", + "type" : "INTERVAL SECOND(6) NOT NULL" + } ], + "type" : "TIMESTAMP(3)" + }, + "rowtimeFieldIndex" : 1, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "name", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "rowtime", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)])" + }, { + "id" : 17, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "name", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "rowtime", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "Exchange(distribution=[hash[name]])" + }, { + "id" : 18, + "type" : "stream-exec-group-window-aggregate_1", + "configuration" : { + "table.exec.mini-batch.enabled" : "false", + "table.exec.mini-batch.size" : "-1", + "table.local-time-zone" : "default" + }, + "grouping" : [ 0 ], + "aggCalls" : [ { + "name" : "EXPR$1", + "syntax" : "FUNCTION_STAR", + "internalName" : "$COUNT$1", + "argList" : [ ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT NOT NULL" + } ], + "window" : { + "kind" : "SESSION", + "alias" : { + "name" : "w$", + "type" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, + "timeField" : { + "fieldName" : "rowtime", + "fieldIndex" : 1, + "inputIndex" : 0, + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, + "gap" : "PT3S" + }, + "namedWindowProperties" : [ ], + "needRetraction" : false, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`name` VARCHAR(2147483647), `EXPR$1` BIGINT NOT NULL>", + "description" : "GroupWindowAggregate(groupBy=[name], window=[SessionGroupWindow('w$, rowtime, 3000)], select=[name, COUNT(*) AS EXPR$1])" + }, { + "id" : 19, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "name", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "cnt", + "dataType" : "BIGINT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`name` VARCHAR(2147483647), `EXPR$1` BIGINT NOT NULL>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[name, EXPR$1])" + } ], + "edges" : [ { + "source" : 14, + "target" : 15, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 15, + "target" : 16, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 16, + "target" : 17, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 17, + "target" : 18, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 18, + "target" : 19, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-session-event-time/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-session-event-time/savepoint/_metadata new file mode 100644 index 00000000000..b5d23f6cf79 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-session-event-time/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-session-proc-time/plan/group-window-aggregate-session-proc-time.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-session-proc-time/plan/group-window-aggregate-session-proc-time.json new file mode 100644 index 00000000000..bec28ec65bd --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-session-proc-time/plan/group-window-aggregate-session-proc-time.json @@ -0,0 +1,412 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 15, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "ts", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "a_int", + "dataType" : "INT" + }, { + "name" : "b_double", + "dataType" : "DOUBLE" + }, { + "name" : "c_float", + "dataType" : "FLOAT" + }, { + "name" : "d_bigdec", + "dataType" : "DECIMAL(10, 2)" + }, { + "name" : "comment", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "name", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "rowtime", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "TO_TIMESTAMP(`ts`)" + } + }, { + "name" : "proctime", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$PROCTIME$1", + "operands" : [ ], + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, + "serializableString" : "PROCTIME()" + } + } ], + "watermarkSpecs" : [ { + "rowtimeAttribute" : "rowtime", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$-$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 7, + "type" : "TIMESTAMP(3)" + }, { + "kind" : "LITERAL", + "value" : "1000", + "type" : "INTERVAL SECOND(6) NOT NULL" + } ], + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "`rowtime` - INTERVAL '1' SECOND" + } + } ] + }, + "partitionKeys" : [ ] + } + }, + "abilities" : [ { + "type" : "ProjectPushDown", + "projectedFields" : [ [ 6 ], [ 0 ] ], + "producedType" : "ROW<`name` VARCHAR(2147483647), `ts` VARCHAR(2147483647)> NOT NULL" + }, { + "type" : "ReadingMetadata", + "metadataKeys" : [ ], + "producedType" : "ROW<`name` VARCHAR(2147483647), `ts` VARCHAR(2147483647)> NOT NULL" + } ] + }, + "outputType" : "ROW<`name` VARCHAR(2147483647), `ts` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t, project=[name, ts], metadata=[]]], fields=[name, ts])", + "inputProperties" : [ ] + }, { + "id" : 16, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "CALL", + "internalName" : "$PROCTIME$1", + "operands" : [ ], + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "name", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "proctime", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, { + "name" : "rowtime", + "fieldType" : "TIMESTAMP(3)" + } ] + }, + "description" : "Calc(select=[name, PROCTIME() AS proctime, TO_TIMESTAMP(ts) AS rowtime])" + }, { + "id" : 17, + "type" : "stream-exec-watermark-assigner_1", + "watermarkExpr" : { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$-$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "TIMESTAMP(3)" + }, { + "kind" : "LITERAL", + "value" : "1000", + "type" : "INTERVAL SECOND(6) NOT NULL" + } ], + "type" : "TIMESTAMP(3)" + }, + "rowtimeFieldIndex" : 2, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "name", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "proctime", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, { + "name" : "rowtime", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)])" + }, { + "id" : 18, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "name", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "proctime", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + } ] + }, + "description" : "Calc(select=[name, proctime])" + }, { + "id" : 19, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "name", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "proctime", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + } ] + }, + "description" : "Exchange(distribution=[hash[name]])" + }, { + "id" : 20, + "type" : "stream-exec-group-window-aggregate_1", + "configuration" : { + "table.exec.mini-batch.enabled" : "false", + "table.exec.mini-batch.size" : "-1", + "table.local-time-zone" : "default" + }, + "grouping" : [ 0 ], + "aggCalls" : [ { + "name" : "EXPR$1", + "syntax" : "FUNCTION_STAR", + "internalName" : "$COUNT$1", + "argList" : [ ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT NOT NULL" + } ], + "window" : { + "kind" : "SESSION", + "alias" : { + "name" : "w$", + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, + "timeField" : { + "fieldName" : "proctime", + "fieldIndex" : 1, + "inputIndex" : 0, + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, + "gap" : "PT3S" + }, + "namedWindowProperties" : [ ], + "needRetraction" : false, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`name` VARCHAR(2147483647), `EXPR$1` BIGINT NOT NULL>", + "description" : "GroupWindowAggregate(groupBy=[name], window=[SessionGroupWindow('w$, proctime, 3000)], select=[name, COUNT(*) AS EXPR$1])" + }, { + "id" : 21, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "name", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "cnt", + "dataType" : "BIGINT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`name` VARCHAR(2147483647), `EXPR$1` BIGINT NOT NULL>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[name, EXPR$1])" + } ], + "edges" : [ { + "source" : 15, + "target" : 16, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 16, + "target" : 17, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 17, + "target" : 18, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 18, + "target" : 19, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 19, + "target" : 20, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 20, + "target" : 21, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-session-proc-time/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-session-proc-time/savepoint/_metadata new file mode 100644 index 00000000000..2ab8f7a2d51 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-session-proc-time/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-tumble-event-time/plan/group-window-aggregate-tumble-event-time.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-tumble-event-time/plan/group-window-aggregate-tumble-event-time.json new file mode 100644 index 00000000000..f9730f7e470 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-tumble-event-time/plan/group-window-aggregate-tumble-event-time.json @@ -0,0 +1,511 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 1, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "ts", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "a_int", + "dataType" : "INT" + }, { + "name" : "b_double", + "dataType" : "DOUBLE" + }, { + "name" : "c_float", + "dataType" : "FLOAT" + }, { + "name" : "d_bigdec", + "dataType" : "DECIMAL(10, 2)" + }, { + "name" : "comment", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "name", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "rowtime", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "TO_TIMESTAMP(`ts`)" + } + }, { + "name" : "proctime", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$PROCTIME$1", + "operands" : [ ], + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, + "serializableString" : "PROCTIME()" + } + } ], + "watermarkSpecs" : [ { + "rowtimeAttribute" : "rowtime", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$-$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 7, + "type" : "TIMESTAMP(3)" + }, { + "kind" : "LITERAL", + "value" : "1000", + "type" : "INTERVAL SECOND(6) NOT NULL" + } ], + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "`rowtime` - INTERVAL '1' SECOND" + } + } ] + }, + "partitionKeys" : [ ] + } + }, + "abilities" : [ { + "type" : "ProjectPushDown", + "projectedFields" : [ [ 6 ], [ 0 ], [ 1 ], [ 5 ] ], + "producedType" : "ROW<`name` VARCHAR(2147483647), `ts` VARCHAR(2147483647), `a_int` INT, `comment` VARCHAR(2147483647)> NOT NULL" + }, { + "type" : "ReadingMetadata", + "metadataKeys" : [ ], + "producedType" : "ROW<`name` VARCHAR(2147483647), `ts` VARCHAR(2147483647), `a_int` INT, `comment` VARCHAR(2147483647)> NOT NULL" + } ] + }, + "outputType" : "ROW<`name` VARCHAR(2147483647), `ts` VARCHAR(2147483647), `a_int` INT, `comment` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t, project=[name, ts, a_int, comment], metadata=[]]], fields=[name, ts, a_int, comment])", + "inputProperties" : [ ] + }, { + "id" : 2, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "INT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "VARCHAR(2147483647)" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`name` VARCHAR(2147483647), `rowtime` TIMESTAMP(3), `a_int` INT, `comment` VARCHAR(2147483647)>", + "description" : "Calc(select=[name, TO_TIMESTAMP(ts) AS rowtime, a_int, comment])" + }, { + "id" : 3, + "type" : "stream-exec-watermark-assigner_1", + "watermarkExpr" : { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$-$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "TIMESTAMP(3)" + }, { + "kind" : "LITERAL", + "value" : "1000", + "type" : "INTERVAL SECOND(6) NOT NULL" + } ], + "type" : "TIMESTAMP(3)" + }, + "rowtimeFieldIndex" : 1, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "name", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "rowtime", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "name" : "a_int", + "fieldType" : "INT" + }, { + "name" : "comment", + "fieldType" : "VARCHAR(2147483647)" + } ] + }, + "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)])" + }, { + "id" : 4, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "name", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "rowtime", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "name" : "a_int", + "fieldType" : "INT" + }, { + "name" : "comment", + "fieldType" : "VARCHAR(2147483647)" + } ] + }, + "description" : "Exchange(distribution=[hash[name]])" + }, { + "id" : 5, + "type" : "stream-exec-group-window-aggregate_1", + "configuration" : { + "table.exec.mini-batch.enabled" : "false", + "table.exec.mini-batch.size" : "-1", + "table.local-time-zone" : "default" + }, + "grouping" : [ 0 ], + "aggCalls" : [ { + "name" : "EXPR$3", + "syntax" : "FUNCTION_STAR", + "internalName" : "$COUNT$1", + "argList" : [ ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT NOT NULL" + }, { + "name" : "EXPR$4", + "internalName" : "$SUM$1", + "argList" : [ 2 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "INT" + }, { + "name" : "EXPR$5", + "syntax" : "FUNCTION_STAR", + "internalName" : "$COUNT$1", + "argList" : [ 3 ], + "filterArg" : -1, + "distinct" : true, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT NOT NULL" + } ], + "window" : { + "kind" : "TUMBLING", + "alias" : { + "name" : "w$", + "type" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, + "timeField" : { + "fieldName" : "rowtime", + "fieldIndex" : 1, + "inputIndex" : 0, + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, + "isTimeWindow" : true, + "size" : "PT5S" + }, + "namedWindowProperties" : [ { + "name" : "w$start", + "property" : { + "kind" : "WindowStart", + "reference" : { + "name" : "w$", + "type" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } + } + }, { + "name" : "w$end", + "property" : { + "kind" : "WindowEnd", + "reference" : { + "name" : "w$", + "type" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } + } + }, { + "name" : "w$rowtime", + "property" : { + "kind" : "Rowtime", + "reference" : { + "name" : "w$", + "type" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } + } + }, { + "name" : "w$proctime", + "property" : { + "kind" : "Proctime", + "reference" : { + "name" : "w$", + "type" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } + } + } ], + "needRetraction" : false, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "name", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "EXPR$3", + "fieldType" : "BIGINT NOT NULL" + }, { + "name" : "EXPR$4", + "fieldType" : "INT" + }, { + "name" : "EXPR$5", + "fieldType" : "BIGINT NOT NULL" + }, { + "name" : "w$start", + "fieldType" : "TIMESTAMP(3) NOT NULL" + }, { + "name" : "w$end", + "fieldType" : "TIMESTAMP(3) NOT NULL" + }, { + "name" : "w$rowtime", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "name" : "w$proctime", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "precision" : 3, + "kind" : "PROCTIME" + } + } ] + }, + "description" : "GroupWindowAggregate(groupBy=[name], window=[TumblingGroupWindow('w$, rowtime, 5000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[name, COUNT(*) AS EXPR$3, SUM(a_int) AS EXPR$4, COUNT(DISTINCT comment) AS EXPR$5, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])" + }, { + "id" : 6, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "TIMESTAMP(3) NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 5, + "type" : "TIMESTAMP(3) NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "BIGINT NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "INT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "BIGINT NOT NULL" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`name` VARCHAR(2147483647), `window_start` TIMESTAMP(3) NOT NULL, `window_end` TIMESTAMP(3) NOT NULL, `EXPR$3` BIGINT NOT NULL, `EXPR$4` INT, `EXPR$5` BIGINT NOT NULL>", + "description" : "Calc(select=[name, w$start AS window_start, w$end AS window_end, EXPR$3, EXPR$4, EXPR$5])" + }, { + "id" : 7, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "name", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "window_start", + "dataType" : "TIMESTAMP(3)" + }, { + "name" : "window_end", + "dataType" : "TIMESTAMP(3)" + }, { + "name" : "cnt", + "dataType" : "BIGINT" + }, { + "name" : "sum_int", + "dataType" : "INT" + }, { + "name" : "distinct_cnt", + "dataType" : "BIGINT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT" ], + "inputUpsertKey" : [ 0, 1 ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`name` VARCHAR(2147483647), `window_start` TIMESTAMP(3) NOT NULL, `window_end` TIMESTAMP(3) NOT NULL, `EXPR$3` BIGINT NOT NULL, `EXPR$4` INT, `EXPR$5` BIGINT NOT NULL>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[name, window_start, window_end, EXPR$3, EXPR$4, EXPR$5])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 3, + "target" : 4, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 4, + "target" : 5, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 5, + "target" : 6, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 6, + "target" : 7, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-tumble-event-time/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-tumble-event-time/savepoint/_metadata new file mode 100644 index 00000000000..4bb9e4d888e Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-tumble-event-time/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-tumble-proc-time/plan/group-window-aggregate-tumble-proc-time.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-tumble-proc-time/plan/group-window-aggregate-tumble-proc-time.json new file mode 100644 index 00000000000..c6f5e4fdd01 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-tumble-proc-time/plan/group-window-aggregate-tumble-proc-time.json @@ -0,0 +1,478 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 1, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "ts", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "a_int", + "dataType" : "INT" + }, { + "name" : "b_double", + "dataType" : "DOUBLE" + }, { + "name" : "c_float", + "dataType" : "FLOAT" + }, { + "name" : "d_bigdec", + "dataType" : "DECIMAL(10, 2)" + }, { + "name" : "comment", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "name", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "rowtime", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "TO_TIMESTAMP(`ts`)" + } + }, { + "name" : "proctime", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$PROCTIME$1", + "operands" : [ ], + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, + "serializableString" : "PROCTIME()" + } + } ], + "watermarkSpecs" : [ { + "rowtimeAttribute" : "rowtime", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$-$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 7, + "type" : "TIMESTAMP(3)" + }, { + "kind" : "LITERAL", + "value" : "1000", + "type" : "INTERVAL SECOND(6) NOT NULL" + } ], + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "`rowtime` - INTERVAL '1' SECOND" + } + } ] + }, + "partitionKeys" : [ ] + } + }, + "abilities" : [ { + "type" : "ProjectPushDown", + "projectedFields" : [ [ 6 ], [ 1 ], [ 5 ], [ 0 ] ], + "producedType" : "ROW<`name` VARCHAR(2147483647), `a_int` INT, `comment` VARCHAR(2147483647), `ts` VARCHAR(2147483647)> NOT NULL" + }, { + "type" : "ReadingMetadata", + "metadataKeys" : [ ], + "producedType" : "ROW<`name` VARCHAR(2147483647), `a_int` INT, `comment` VARCHAR(2147483647), `ts` VARCHAR(2147483647)> NOT NULL" + } ] + }, + "outputType" : "ROW<`name` VARCHAR(2147483647), `a_int` INT, `comment` VARCHAR(2147483647), `ts` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t, project=[name, a_int, comment, ts], metadata=[]]], fields=[name, a_int, comment, ts])", + "inputProperties" : [ ] + }, { + "id" : 2, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "CALL", + "internalName" : "$PROCTIME$1", + "operands" : [ ], + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "INT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "name", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "proctime", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, { + "name" : "a_int", + "fieldType" : "INT" + }, { + "name" : "comment", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "rowtime", + "fieldType" : "TIMESTAMP(3)" + } ] + }, + "description" : "Calc(select=[name, PROCTIME() AS proctime, a_int, comment, TO_TIMESTAMP(ts) AS rowtime])" + }, { + "id" : 3, + "type" : "stream-exec-watermark-assigner_1", + "watermarkExpr" : { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$-$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "TIMESTAMP(3)" + }, { + "kind" : "LITERAL", + "value" : "1000", + "type" : "INTERVAL SECOND(6) NOT NULL" + } ], + "type" : "TIMESTAMP(3)" + }, + "rowtimeFieldIndex" : 4, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "name", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "proctime", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, { + "name" : "a_int", + "fieldType" : "INT" + }, { + "name" : "comment", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "rowtime", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)])" + }, { + "id" : 4, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "INT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "VARCHAR(2147483647)" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "name", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "proctime", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, { + "name" : "a_int", + "fieldType" : "INT" + }, { + "name" : "comment", + "fieldType" : "VARCHAR(2147483647)" + } ] + }, + "description" : "Calc(select=[name, proctime, a_int, comment])" + }, { + "id" : 5, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "name", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "proctime", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, { + "name" : "a_int", + "fieldType" : "INT" + }, { + "name" : "comment", + "fieldType" : "VARCHAR(2147483647)" + } ] + }, + "description" : "Exchange(distribution=[hash[name]])" + }, { + "id" : 6, + "type" : "stream-exec-group-window-aggregate_1", + "configuration" : { + "table.exec.mini-batch.enabled" : "false", + "table.exec.mini-batch.size" : "-1", + "table.local-time-zone" : "default" + }, + "grouping" : [ 0 ], + "aggCalls" : [ { + "name" : "EXPR$1", + "syntax" : "FUNCTION_STAR", + "internalName" : "$COUNT$1", + "argList" : [ ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT NOT NULL" + }, { + "name" : "EXPR$2", + "internalName" : "$SUM$1", + "argList" : [ 2 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "INT" + }, { + "name" : "EXPR$3", + "syntax" : "FUNCTION_STAR", + "internalName" : "$COUNT$1", + "argList" : [ 3 ], + "filterArg" : -1, + "distinct" : true, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT NOT NULL" + } ], + "window" : { + "kind" : "TUMBLING", + "alias" : { + "name" : "w$", + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, + "timeField" : { + "fieldName" : "proctime", + "fieldIndex" : 1, + "inputIndex" : 0, + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, + "isTimeWindow" : true, + "size" : "PT5S" + }, + "namedWindowProperties" : [ ], + "needRetraction" : false, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`name` VARCHAR(2147483647), `EXPR$1` BIGINT NOT NULL, `EXPR$2` INT, `EXPR$3` BIGINT NOT NULL>", + "description" : "GroupWindowAggregate(groupBy=[name], window=[TumblingGroupWindow('w$, proctime, 5000)], select=[name, COUNT(*) AS EXPR$1, SUM(a_int) AS EXPR$2, COUNT(DISTINCT comment) AS EXPR$3])" + }, { + "id" : 7, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "name", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "cnt", + "dataType" : "BIGINT" + }, { + "name" : "sum_int", + "dataType" : "INT" + }, { + "name" : "distinct_cnt", + "dataType" : "BIGINT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`name` VARCHAR(2147483647), `EXPR$1` BIGINT NOT NULL, `EXPR$2` INT, `EXPR$3` BIGINT NOT NULL>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[name, EXPR$1, EXPR$2, EXPR$3])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 3, + "target" : 4, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 4, + "target" : 5, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 5, + "target" : 6, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 6, + "target" : 7, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-tumble-proc-time/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-tumble-proc-time/savepoint/_metadata new file mode 100644 index 00000000000..0d56b1add79 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-tumble-proc-time/savepoint/_metadata differ