This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 34ec781ac7547d376d09854983d169a3aca5130f
Author: bvarghese1 <bvargh...@confluent.io>
AuthorDate: Wed Nov 15 21:04:39 2023 -0800

    [FLINK-33564] Implement restore tests for GroupWindowAggregate node
---
 .../flink/table/test/program/TableTestStep.java    |   5 +
 .../GroupWindowAggregateEventTimeRestoreTest.java  |  41 ++
 .../GroupWindowAggregateProcTimeRestoreTest.java   |  42 ++
 .../stream/GroupWindowAggregateTestPrograms.java   | 248 ++++++++++
 .../plan/nodes/exec/testutils/RestoreTestBase.java |  93 +++-
 .../group-window-aggregate-hop-event-time.json     | 328 +++++++++++++
 .../savepoint/_metadata                            | Bin 0 -> 12035 bytes
 .../plan/group-window-aggregate-hop-proc-time.json | 414 +++++++++++++++++
 .../savepoint/_metadata                            | Bin 0 -> 11660 bytes
 .../group-window-aggregate-session-event-time.json | 326 +++++++++++++
 .../savepoint/_metadata                            | Bin 0 -> 12340 bytes
 .../group-window-aggregate-session-proc-time.json  | 412 +++++++++++++++++
 .../savepoint/_metadata                            | Bin 0 -> 11941 bytes
 .../group-window-aggregate-tumble-event-time.json  | 511 +++++++++++++++++++++
 .../savepoint/_metadata                            | Bin 0 -> 22303 bytes
 .../group-window-aggregate-tumble-proc-time.json   | 478 +++++++++++++++++++
 .../savepoint/_metadata                            | Bin 0 -> 18217 bytes
 17 files changed, 2874 insertions(+), 24 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestStep.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestStep.java
index 1d0207f7126..d1ff6b8d018 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestStep.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestStep.java
@@ -99,6 +99,11 @@ public abstract class TableTestStep implements TestStep {
             return (SpecificBuilder) this;
         }
 
+        public SpecificBuilder addSchema(List<String> schemaComponents) {
+            this.schemaComponents.addAll(schemaComponents);
+            return (SpecificBuilder) this;
+        }
+
         /**
          * Unless the test requires a very specific configuration, try to 
avoid calling this method
          * and fill in options later via {@link 
TableTestStep#apply(TableEnvironment, Map)}.
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateEventTimeRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateEventTimeRestoreTest.java
new file mode 100644
index 00000000000..b583416f530
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateEventTimeRestoreTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
+import org.apache.flink.table.test.program.TableTestProgram;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** Restore tests for {@link StreamExecGroupWindowAggregate}. */
+public class GroupWindowAggregateEventTimeRestoreTest extends RestoreTestBase {
+
+    public GroupWindowAggregateEventTimeRestoreTest() {
+        super(StreamExecGroupWindowAggregate.class);
+    }
+
+    @Override
+    public List<TableTestProgram> programs() {
+        return Arrays.asList(
+                
GroupWindowAggregateTestPrograms.GROUP_TUMBLE_WINDOW_EVENT_TIME,
+                GroupWindowAggregateTestPrograms.GROUP_HOP_WINDOW_EVENT_TIME,
+                
GroupWindowAggregateTestPrograms.GROUP_SESSION_WINDOW_EVENT_TIME);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateProcTimeRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateProcTimeRestoreTest.java
new file mode 100644
index 00000000000..f33fa80003c
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateProcTimeRestoreTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
+import org.apache.flink.table.test.program.TableTestProgram;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** Restore tests for {@link StreamExecGroupWindowAggregate}. */
+public class GroupWindowAggregateProcTimeRestoreTest extends RestoreTestBase {
+
+    public GroupWindowAggregateProcTimeRestoreTest() {
+        super(StreamExecGroupWindowAggregate.class, 
AfterRestoreSource.INFINITE);
+    }
+
+    @Override
+    public List<TableTestProgram> programs() {
+
+        return Arrays.asList(
+                GroupWindowAggregateTestPrograms.GROUP_TUMBLE_WINDOW_PROC_TIME,
+                GroupWindowAggregateTestPrograms.GROUP_HOP_WINDOW_PROC_TIME,
+                
GroupWindowAggregateTestPrograms.GROUP_SESSION_WINDOW_PROC_TIME);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateTestPrograms.java
new file mode 100644
index 00000000000..03f91f47efe
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateTestPrograms.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+
+import java.math.BigDecimal;
+
+/** {@link TableTestProgram} definitions for testing {@link 
StreamExecGroupWindowAggregate}. */
+public class GroupWindowAggregateTestPrograms {
+
+    static final Row[] BEFORE_DATA = {
+        Row.of("2020-10-10 00:00:01", 1, 1d, 1f, new BigDecimal("1.11"), "Hi", 
"a"),
+        Row.of("2020-10-10 00:00:02", 2, 2d, 2f, new BigDecimal("2.22"), 
"Comment#1", "a"),
+        Row.of("2020-10-10 00:00:03", 2, 2d, 2f, new BigDecimal("2.22"), 
"Comment#1", "a"),
+        Row.of("2020-10-10 00:00:04", 5, 5d, 5f, new BigDecimal("5.55"), null, 
"a"),
+        Row.of("2020-10-10 00:00:07", 3, 3d, 3f, null, "Hello", "b"),
+        // out of order
+        Row.of("2020-10-10 00:00:06", 6, 6d, 6f, new BigDecimal("6.66"), "Hi", 
"b"),
+        Row.of("2020-10-10 00:00:08", 3, null, 3f, new BigDecimal("3.33"), 
"Comment#2", "a"),
+        // late event
+        Row.of("2020-10-10 00:00:04", 5, 5d, null, new BigDecimal("5.55"), 
"Hi", "a"),
+        Row.of("2020-10-10 00:00:16", 4, 4d, 4f, new BigDecimal("4.44"), "Hi", 
"b"),
+        Row.of("2020-10-10 00:00:32", 7, 7d, 7f, new BigDecimal("7.77"), null, 
null),
+        Row.of("2020-10-10 00:00:34", 1, 3d, 3f, new BigDecimal("3.33"), 
"Comment#3", "b")
+    };
+
+    static final Row[] AFTER_DATA = {
+        Row.of("2020-10-10 00:00:41", 10, 3d, 3f, new BigDecimal("4.44"), 
"Comment#4", "a"),
+        Row.of("2020-10-10 00:00:42", 11, 4d, 4f, new BigDecimal("5.44"), 
"Comment#5", "d"),
+        Row.of("2020-10-10 00:00:43", 12, 5d, 5f, new BigDecimal("6.44"), 
"Comment#6", "c"),
+        Row.of("2020-10-10 00:00:44", 13, 6d, 6f, new BigDecimal("7.44"), 
"Comment#7", "d")
+    };
+
+    static final SourceTestStep SOURCE =
+            SourceTestStep.newBuilder("source_t")
+                    .addSchema(
+                            "ts STRING",
+                            "a_int INT",
+                            "b_double DOUBLE",
+                            "c_float FLOAT",
+                            "d_bigdec DECIMAL(10, 2)",
+                            "`comment` STRING",
+                            "name STRING",
+                            "`rowtime` AS TO_TIMESTAMP(`ts`)",
+                            "`proctime` AS PROCTIME()",
+                            "WATERMARK for `rowtime` AS `rowtime` - INTERVAL 
'1' SECOND")
+                    .producedBeforeRestore(BEFORE_DATA)
+                    .producedAfterRestore(AFTER_DATA)
+                    .build();
+
+    static final TableTestProgram GROUP_TUMBLE_WINDOW_EVENT_TIME =
+            TableTestProgram.of(
+                            "group-window-aggregate-tumble-event-time",
+                            "validates group by using tumbling window with 
event time")
+                    .setupTableSource(SOURCE)
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(
+                                            "name STRING",
+                                            "window_start TIMESTAMP(3)",
+                                            "window_end TIMESTAMP(3)",
+                                            "cnt BIGINT",
+                                            "sum_int INT",
+                                            "distinct_cnt BIGINT")
+                                    .consumedBeforeRestore(
+                                            "+I[a, 2020-10-10T00:00, 
2020-10-10T00:00:05, 4, 10, 2]",
+                                            "+I[a, 2020-10-10T00:00:05, 
2020-10-10T00:00:10, 1, 3, 1]",
+                                            "+I[b, 2020-10-10T00:00:05, 
2020-10-10T00:00:10, 2, 9, 2]",
+                                            "+I[b, 2020-10-10T00:00:15, 
2020-10-10T00:00:20, 1, 4, 1]")
+                                    .consumedAfterRestore(
+                                            "+I[b, 2020-10-10T00:00:30, 
2020-10-10T00:00:35, 1, 1, 1]",
+                                            "+I[null, 2020-10-10T00:00:30, 
2020-10-10T00:00:35, 1, 7, 0]",
+                                            "+I[a, 2020-10-10T00:00:40, 
2020-10-10T00:00:45, 1, 10, 1]",
+                                            "+I[c, 2020-10-10T00:00:40, 
2020-10-10T00:00:45, 1, 12, 1]",
+                                            "+I[d, 2020-10-10T00:00:40, 
2020-10-10T00:00:45, 2, 24, 2]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t SELECT "
+                                    + "name, "
+                                    + "TUMBLE_START(rowtime, INTERVAL '5' 
SECOND) AS window_start, "
+                                    + "TUMBLE_END(rowtime, INTERVAL '5' 
SECOND) AS window_end, "
+                                    + "COUNT(*), "
+                                    + "SUM(a_int), "
+                                    + "COUNT(DISTINCT `comment`) "
+                                    + "FROM source_t "
+                                    + "GROUP BY name, TUMBLE(rowtime, INTERVAL 
'5' SECOND)")
+                    .build();
+
+    static final TableTestProgram GROUP_TUMBLE_WINDOW_PROC_TIME =
+            TableTestProgram.of(
+                            "group-window-aggregate-tumble-proc-time",
+                            "validates group by using tumbling window with 
processing time")
+                    .setupTableSource(SOURCE)
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(
+                                            "name STRING",
+                                            "cnt BIGINT",
+                                            "sum_int INT",
+                                            "distinct_cnt BIGINT")
+                                    .consumedBeforeRestore(
+                                            "+I[a, 6, 18, 3]",
+                                            "+I[null, 1, 7, 0]",
+                                            "+I[b, 4, 14, 3]")
+                                    .consumedAfterRestore(
+                                            "+I[a, 1, 10, 1]", "+I[c, 1, 12, 
1]", "+I[d, 2, 24, 2]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t SELECT "
+                                    + "name, "
+                                    + "COUNT(*), "
+                                    + "SUM(a_int), "
+                                    + "COUNT(DISTINCT `comment`) "
+                                    + "FROM source_t "
+                                    + "GROUP BY name, TUMBLE(proctime, 
INTERVAL '5' SECOND)")
+                    .build();
+
+    static final TableTestProgram GROUP_HOP_WINDOW_EVENT_TIME =
+            TableTestProgram.of(
+                            "group-window-aggregate-hop-event-time",
+                            "validates group by using hopping window with 
event time")
+                    .setupTableSource(SOURCE)
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema("name STRING", "cnt BIGINT")
+                                    .consumedBeforeRestore(
+                                            "+I[a, 4]",
+                                            "+I[b, 2]",
+                                            "+I[a, 6]",
+                                            "+I[a, 1]",
+                                            "+I[b, 2]",
+                                            "+I[b, 1]",
+                                            "+I[b, 1]")
+                                    .consumedAfterRestore(
+                                            "+I[b, 1]",
+                                            "+I[null, 1]",
+                                            "+I[b, 1]",
+                                            "+I[null, 1]",
+                                            "+I[a, 1]",
+                                            "+I[d, 2]",
+                                            "+I[c, 1]",
+                                            "+I[a, 1]",
+                                            "+I[c, 1]",
+                                            "+I[d, 2]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t SELECT "
+                                    + "name, "
+                                    + "COUNT(*) "
+                                    + "FROM source_t "
+                                    + "GROUP BY name, HOP(rowtime, INTERVAL 
'5' SECOND, INTERVAL '10' SECOND)")
+                    .build();
+
+    static final TableTestProgram GROUP_HOP_WINDOW_PROC_TIME =
+            TableTestProgram.of(
+                            "group-window-aggregate-hop-proc-time",
+                            "validates group by using hopping window with 
processing time")
+                    .setupTableSource(SOURCE)
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema("name STRING", "cnt BIGINT")
+                                    .consumedBeforeRestore(
+                                            "+I[a, 6]",
+                                            "+I[b, 4]",
+                                            "+I[null, 1]",
+                                            "+I[a, 6]",
+                                            "+I[null, 1]",
+                                            "+I[b, 4]")
+                                    .consumedAfterRestore(
+                                            "+I[a, 1]",
+                                            "+I[d, 2]",
+                                            "+I[c, 1]",
+                                            "+I[a, 1]",
+                                            "+I[c, 1]",
+                                            "+I[d, 2]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t SELECT "
+                                    + "name, "
+                                    + "COUNT(*) "
+                                    + "FROM source_t "
+                                    + "GROUP BY name, HOP(proctime, INTERVAL 
'5' SECOND, INTERVAL '10' SECOND)")
+                    .build();
+
+    static final TableTestProgram GROUP_SESSION_WINDOW_EVENT_TIME =
+            TableTestProgram.of(
+                            "group-window-aggregate-session-event-time",
+                            "validates group by using session window with 
event time")
+                    .setupTableSource(SOURCE)
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema("name STRING", "cnt BIGINT")
+                                    .consumedBeforeRestore(
+                                            "+I[a, 4]", "+I[b, 2]", "+I[a, 
1]", "+I[b, 1]")
+                                    .consumedAfterRestore(
+                                            "+I[null, 1]",
+                                            "+I[b, 1]",
+                                            "+I[a, 1]",
+                                            "+I[c, 1]",
+                                            "+I[d, 2]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t SELECT "
+                                    + "name, "
+                                    + "COUNT(*) "
+                                    + "FROM source_t "
+                                    + "GROUP BY name, SESSION(rowtime, 
INTERVAL '3' SECOND)")
+                    .build();
+
+    static final TableTestProgram GROUP_SESSION_WINDOW_PROC_TIME =
+            TableTestProgram.of(
+                            "group-window-aggregate-session-proc-time",
+                            "validates group by using session window with 
processing time")
+                    .setupTableSource(SOURCE)
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema("name STRING", "cnt BIGINT")
+                                    .consumedBeforeRestore("+I[a, 6]", 
"+I[null, 1]", "+I[b, 4]")
+                                    .consumedAfterRestore("+I[a, 1]", "+I[c, 
1]", "+I[d, 2]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t SELECT "
+                                    + "name, "
+                                    + "COUNT(*) "
+                                    + "FROM source_t "
+                                    + "GROUP BY name, SESSION(proctime, 
INTERVAL '3' SECOND)")
+                    .build();
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
index 59a72d28fff..aed76d7a3c2 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
@@ -81,9 +81,26 @@ import static org.assertj.core.api.Assertions.assertThat;
 public abstract class RestoreTestBase implements TableTestProgramRunner {
 
     private final Class<? extends ExecNode> execNodeUnderTest;
+    private final AfterRestoreSource afterRestoreSource;
 
     protected RestoreTestBase(Class<? extends ExecNode> execNodeUnderTest) {
         this.execNodeUnderTest = execNodeUnderTest;
+        this.afterRestoreSource = AfterRestoreSource.FINITE;
+    }
+
+    protected RestoreTestBase(
+            Class<? extends ExecNode> execNodeUnderTest, AfterRestoreSource 
state) {
+        this.execNodeUnderTest = execNodeUnderTest;
+        this.afterRestoreSource = state;
+    }
+
+    /**
+     * AfterRestoreSource defines the source behavior while running {@link
+     * RestoreTestBase#testRestore}.
+     */
+    protected enum AfterRestoreSource {
+        FINITE,
+        INFINITE
     }
 
     @Override
@@ -117,6 +134,31 @@ public abstract class RestoreTestBase implements 
TableTestProgramRunner {
                                 supportedPrograms().stream().map(p -> 
Arguments.of(p, metadata)));
     }
 
+    private void registerSinkObserver(
+            final List<CompletableFuture<?>> futures,
+            final SinkTestStep sinkTestStep,
+            final boolean ignoreAfter) {
+        final CompletableFuture<Object> future = new CompletableFuture<>();
+        futures.add(future);
+        final String tableName = sinkTestStep.name;
+        TestValuesTableFactory.registerLocalRawResultsObserver(
+                tableName,
+                (integer, strings) -> {
+                    List<String> results = new ArrayList<>();
+                    
results.addAll(sinkTestStep.getExpectedBeforeRestoreAsStrings());
+                    if (!ignoreAfter) {
+                        
results.addAll(sinkTestStep.getExpectedAfterRestoreAsStrings());
+                    }
+                    final boolean shouldComplete =
+                            CollectionUtils.isEqualCollection(
+                                    
TestValuesTableFactory.getRawResultsAsStrings(tableName),
+                                    results);
+                    if (shouldComplete) {
+                        future.complete(null);
+                    }
+                });
+    }
+
     /**
      * Execute this test to generate test files. Remember to be using the 
correct branch when
      * generating the test files.
@@ -145,20 +187,7 @@ public abstract class RestoreTestBase implements 
TableTestProgramRunner {
 
         final List<CompletableFuture<?>> futures = new ArrayList<>();
         for (SinkTestStep sinkTestStep : program.getSetupSinkTestSteps()) {
-            final CompletableFuture<Object> future = new CompletableFuture<>();
-            futures.add(future);
-            final String tableName = sinkTestStep.name;
-            TestValuesTableFactory.registerLocalRawResultsObserver(
-                    tableName,
-                    (integer, strings) -> {
-                        final boolean shouldTakeSavepoint =
-                                CollectionUtils.isEqualCollection(
-                                        
TestValuesTableFactory.getRawResultsAsStrings(tableName),
-                                        
sinkTestStep.getExpectedBeforeRestoreAsStrings());
-                        if (shouldTakeSavepoint) {
-                            future.complete(null);
-                        }
-                    });
+            registerSinkObserver(futures, sinkTestStep, true);
             final Map<String, String> options = new HashMap<>();
             options.put("connector", "values");
             options.put("disable-lookup", "true");
@@ -202,6 +231,7 @@ public abstract class RestoreTestBase implements 
TableTestProgramRunner {
                 .set(
                         TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS,
                         TableConfigOptions.CatalogPlanRestore.IDENTIFIER);
+
         for (SourceTestStep sourceTestStep : 
program.getSetupSourceTestSteps()) {
             final String id = 
TestValuesTableFactory.registerData(sourceTestStep.dataAfterRestore);
             final Map<String, String> options = new HashMap<>();
@@ -209,10 +239,18 @@ public abstract class RestoreTestBase implements 
TableTestProgramRunner {
             options.put("data-id", id);
             options.put("disable-lookup", "true");
             options.put("runtime-source", "NewSource");
+            if (afterRestoreSource == AfterRestoreSource.INFINITE) {
+                options.put("terminating", "false");
+            }
             sourceTestStep.apply(tEnv, options);
         }
 
+        final List<CompletableFuture<?>> futures = new ArrayList<>();
+
         for (SinkTestStep sinkTestStep : program.getSetupSinkTestSteps()) {
+            if (afterRestoreSource == AfterRestoreSource.INFINITE) {
+                registerSinkObserver(futures, sinkTestStep, false);
+            }
             final Map<String, String> options = new HashMap<>();
             options.put("connector", "values");
             options.put("disable-lookup", "true");
@@ -224,16 +262,23 @@ public abstract class RestoreTestBase implements 
TableTestProgramRunner {
 
         final CompiledPlan compiledPlan =
                 tEnv.loadPlan(PlanReference.fromFile(getPlanPath(program, 
metadata)));
-        compiledPlan.execute().await();
-        for (SinkTestStep sinkTestStep : program.getSetupSinkTestSteps()) {
-            
assertThat(TestValuesTableFactory.getRawResultsAsStrings(sinkTestStep.name))
-                    .containsExactlyInAnyOrder(
-                            Stream.concat(
-                                            
sinkTestStep.getExpectedBeforeRestoreAsStrings()
-                                                    .stream(),
-                                            
sinkTestStep.getExpectedAfterRestoreAsStrings()
-                                                    .stream())
-                                    .toArray(String[]::new));
+
+        if (afterRestoreSource == AfterRestoreSource.INFINITE) {
+            final TableResult tableResult = compiledPlan.execute();
+            CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[0])).get();
+            tableResult.getJobClient().get().cancel().get();
+        } else {
+            compiledPlan.execute().await();
+            for (SinkTestStep sinkTestStep : program.getSetupSinkTestSteps()) {
+                
assertThat(TestValuesTableFactory.getRawResultsAsStrings(sinkTestStep.name))
+                        .containsExactlyInAnyOrder(
+                                Stream.concat(
+                                                
sinkTestStep.getExpectedBeforeRestoreAsStrings()
+                                                        .stream(),
+                                                
sinkTestStep.getExpectedAfterRestoreAsStrings()
+                                                        .stream())
+                                        .toArray(String[]::new));
+            }
         }
     }
 
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-hop-event-time/plan/group-window-aggregate-hop-event-time.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-hop-event-time/plan/group-window-aggregate-hop-event-time.json
new file mode 100644
index 00000000000..626e43edc9b
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-hop-event-time/plan/group-window-aggregate-hop-event-time.json
@@ -0,0 +1,328 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 8,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "ts",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "a_int",
+              "dataType" : "INT"
+            }, {
+              "name" : "b_double",
+              "dataType" : "DOUBLE"
+            }, {
+              "name" : "c_float",
+              "dataType" : "FLOAT"
+            }, {
+              "name" : "d_bigdec",
+              "dataType" : "DECIMAL(10, 2)"
+            }, {
+              "name" : "comment",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "rowtime",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$TO_TIMESTAMP$1",
+                  "operands" : [ {
+                    "kind" : "INPUT_REF",
+                    "inputIndex" : 0,
+                    "type" : "VARCHAR(2147483647)"
+                  } ],
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "TO_TIMESTAMP(`ts`)"
+              }
+            }, {
+              "name" : "proctime",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$PROCTIME$1",
+                  "operands" : [ ],
+                  "type" : {
+                    "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+                    "nullable" : false,
+                    "precision" : 3,
+                    "kind" : "PROCTIME"
+                  }
+                },
+                "serializableString" : "PROCTIME()"
+              }
+            } ],
+            "watermarkSpecs" : [ {
+              "rowtimeAttribute" : "rowtime",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "syntax" : "SPECIAL",
+                  "internalName" : "$-$1",
+                  "operands" : [ {
+                    "kind" : "INPUT_REF",
+                    "inputIndex" : 7,
+                    "type" : "TIMESTAMP(3)"
+                  }, {
+                    "kind" : "LITERAL",
+                    "value" : "1000",
+                    "type" : "INTERVAL SECOND(6) NOT NULL"
+                  } ],
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "`rowtime` - INTERVAL '1' SECOND"
+              }
+            } ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 6 ], [ 0 ] ],
+        "producedType" : "ROW<`name` VARCHAR(2147483647), `ts` 
VARCHAR(2147483647)> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`name` VARCHAR(2147483647), `ts` 
VARCHAR(2147483647)> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `ts` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t, project=[name, ts], metadata=[]]], fields=[name, 
ts])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 9,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "CALL",
+      "internalName" : "$TO_TIMESTAMP$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "VARCHAR(2147483647)"
+      } ],
+      "type" : "TIMESTAMP(3)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `rowtime` TIMESTAMP(3)>",
+    "description" : "Calc(select=[name, TO_TIMESTAMP(ts) AS rowtime])"
+  }, {
+    "id" : 10,
+    "type" : "stream-exec-watermark-assigner_1",
+    "watermarkExpr" : {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$-$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "TIMESTAMP(3)"
+      }, {
+        "kind" : "LITERAL",
+        "value" : "1000",
+        "type" : "INTERVAL SECOND(6) NOT NULL"
+      } ],
+      "type" : "TIMESTAMP(3)"
+    },
+    "rowtimeFieldIndex" : 1,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "name",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime 
- 1000:INTERVAL SECOND)])"
+  }, {
+    "id" : 11,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "name",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "Exchange(distribution=[hash[name]])"
+  }, {
+    "id" : 12,
+    "type" : "stream-exec-group-window-aggregate_1",
+    "configuration" : {
+      "table.exec.mini-batch.enabled" : "false",
+      "table.exec.mini-batch.size" : "-1",
+      "table.local-time-zone" : "default"
+    },
+    "grouping" : [ 0 ],
+    "aggCalls" : [ {
+      "name" : "EXPR$1",
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
+      "argList" : [ ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT NOT NULL"
+    } ],
+    "window" : {
+      "kind" : "SLIDING",
+      "alias" : {
+        "name" : "w$",
+        "type" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      },
+      "timeField" : {
+        "fieldName" : "rowtime",
+        "fieldIndex" : 1,
+        "inputIndex" : 0,
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      },
+      "isTimeWindow" : true,
+      "size" : "PT10S",
+      "slide" : "PT5S"
+    },
+    "namedWindowProperties" : [ ],
+    "needRetraction" : false,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `EXPR$1` BIGINT NOT NULL>",
+    "description" : "GroupWindowAggregate(groupBy=[name], 
window=[SlidingGroupWindow('w$, rowtime, 10000, 5000)], select=[name, COUNT(*) 
AS EXPR$1])"
+  }, {
+    "id" : 13,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "cnt",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `EXPR$1` BIGINT NOT NULL>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[name, EXPR$1])"
+  } ],
+  "edges" : [ {
+    "source" : 8,
+    "target" : 9,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 9,
+    "target" : 10,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 10,
+    "target" : 11,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 11,
+    "target" : 12,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 12,
+    "target" : 13,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-hop-event-time/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-hop-event-time/savepoint/_metadata
new file mode 100644
index 00000000000..2272a348f91
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-hop-event-time/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-hop-proc-time/plan/group-window-aggregate-hop-proc-time.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-hop-proc-time/plan/group-window-aggregate-hop-proc-time.json
new file mode 100644
index 00000000000..a3db2384730
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-hop-proc-time/plan/group-window-aggregate-hop-proc-time.json
@@ -0,0 +1,414 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 8,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "ts",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "a_int",
+              "dataType" : "INT"
+            }, {
+              "name" : "b_double",
+              "dataType" : "DOUBLE"
+            }, {
+              "name" : "c_float",
+              "dataType" : "FLOAT"
+            }, {
+              "name" : "d_bigdec",
+              "dataType" : "DECIMAL(10, 2)"
+            }, {
+              "name" : "comment",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "rowtime",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$TO_TIMESTAMP$1",
+                  "operands" : [ {
+                    "kind" : "INPUT_REF",
+                    "inputIndex" : 0,
+                    "type" : "VARCHAR(2147483647)"
+                  } ],
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "TO_TIMESTAMP(`ts`)"
+              }
+            }, {
+              "name" : "proctime",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$PROCTIME$1",
+                  "operands" : [ ],
+                  "type" : {
+                    "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+                    "nullable" : false,
+                    "precision" : 3,
+                    "kind" : "PROCTIME"
+                  }
+                },
+                "serializableString" : "PROCTIME()"
+              }
+            } ],
+            "watermarkSpecs" : [ {
+              "rowtimeAttribute" : "rowtime",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "syntax" : "SPECIAL",
+                  "internalName" : "$-$1",
+                  "operands" : [ {
+                    "kind" : "INPUT_REF",
+                    "inputIndex" : 7,
+                    "type" : "TIMESTAMP(3)"
+                  }, {
+                    "kind" : "LITERAL",
+                    "value" : "1000",
+                    "type" : "INTERVAL SECOND(6) NOT NULL"
+                  } ],
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "`rowtime` - INTERVAL '1' SECOND"
+              }
+            } ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 6 ], [ 0 ] ],
+        "producedType" : "ROW<`name` VARCHAR(2147483647), `ts` 
VARCHAR(2147483647)> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`name` VARCHAR(2147483647), `ts` 
VARCHAR(2147483647)> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `ts` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t, project=[name, ts], metadata=[]]], fields=[name, 
ts])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 9,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "CALL",
+      "internalName" : "$PROCTIME$1",
+      "operands" : [ ],
+      "type" : {
+        "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+        "nullable" : false,
+        "precision" : 3,
+        "kind" : "PROCTIME"
+      }
+    }, {
+      "kind" : "CALL",
+      "internalName" : "$TO_TIMESTAMP$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "VARCHAR(2147483647)"
+      } ],
+      "type" : "TIMESTAMP(3)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "name",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "proctime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      }, {
+        "name" : "rowtime",
+        "fieldType" : "TIMESTAMP(3)"
+      } ]
+    },
+    "description" : "Calc(select=[name, PROCTIME() AS proctime, 
TO_TIMESTAMP(ts) AS rowtime])"
+  }, {
+    "id" : 10,
+    "type" : "stream-exec-watermark-assigner_1",
+    "watermarkExpr" : {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$-$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 2,
+        "type" : "TIMESTAMP(3)"
+      }, {
+        "kind" : "LITERAL",
+        "value" : "1000",
+        "type" : "INTERVAL SECOND(6) NOT NULL"
+      } ],
+      "type" : "TIMESTAMP(3)"
+    },
+    "rowtimeFieldIndex" : 2,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "name",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "proctime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime 
- 1000:INTERVAL SECOND)])"
+  }, {
+    "id" : 11,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : {
+        "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+        "nullable" : false,
+        "precision" : 3,
+        "kind" : "PROCTIME"
+      }
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "name",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "proctime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      } ]
+    },
+    "description" : "Calc(select=[name, proctime])"
+  }, {
+    "id" : 12,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "name",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "proctime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      } ]
+    },
+    "description" : "Exchange(distribution=[hash[name]])"
+  }, {
+    "id" : 13,
+    "type" : "stream-exec-group-window-aggregate_1",
+    "configuration" : {
+      "table.exec.mini-batch.enabled" : "false",
+      "table.exec.mini-batch.size" : "-1",
+      "table.local-time-zone" : "default"
+    },
+    "grouping" : [ 0 ],
+    "aggCalls" : [ {
+      "name" : "EXPR$1",
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
+      "argList" : [ ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT NOT NULL"
+    } ],
+    "window" : {
+      "kind" : "SLIDING",
+      "alias" : {
+        "name" : "w$",
+        "type" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      },
+      "timeField" : {
+        "fieldName" : "proctime",
+        "fieldIndex" : 1,
+        "inputIndex" : 0,
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      },
+      "isTimeWindow" : true,
+      "size" : "PT10S",
+      "slide" : "PT5S"
+    },
+    "namedWindowProperties" : [ ],
+    "needRetraction" : false,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `EXPR$1` BIGINT NOT NULL>",
+    "description" : "GroupWindowAggregate(groupBy=[name], 
window=[SlidingGroupWindow('w$, proctime, 10000, 5000)], select=[name, COUNT(*) 
AS EXPR$1])"
+  }, {
+    "id" : 14,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "cnt",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `EXPR$1` BIGINT NOT NULL>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[name, EXPR$1])"
+  } ],
+  "edges" : [ {
+    "source" : 8,
+    "target" : 9,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 9,
+    "target" : 10,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 10,
+    "target" : 11,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 11,
+    "target" : 12,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 12,
+    "target" : 13,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 13,
+    "target" : 14,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-hop-proc-time/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-hop-proc-time/savepoint/_metadata
new file mode 100644
index 00000000000..728ac0ac490
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-hop-proc-time/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-session-event-time/plan/group-window-aggregate-session-event-time.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-session-event-time/plan/group-window-aggregate-session-event-time.json
new file mode 100644
index 00000000000..71cc9b36384
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-session-event-time/plan/group-window-aggregate-session-event-time.json
@@ -0,0 +1,326 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 14,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "ts",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "a_int",
+              "dataType" : "INT"
+            }, {
+              "name" : "b_double",
+              "dataType" : "DOUBLE"
+            }, {
+              "name" : "c_float",
+              "dataType" : "FLOAT"
+            }, {
+              "name" : "d_bigdec",
+              "dataType" : "DECIMAL(10, 2)"
+            }, {
+              "name" : "comment",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "rowtime",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$TO_TIMESTAMP$1",
+                  "operands" : [ {
+                    "kind" : "INPUT_REF",
+                    "inputIndex" : 0,
+                    "type" : "VARCHAR(2147483647)"
+                  } ],
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "TO_TIMESTAMP(`ts`)"
+              }
+            }, {
+              "name" : "proctime",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$PROCTIME$1",
+                  "operands" : [ ],
+                  "type" : {
+                    "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+                    "nullable" : false,
+                    "precision" : 3,
+                    "kind" : "PROCTIME"
+                  }
+                },
+                "serializableString" : "PROCTIME()"
+              }
+            } ],
+            "watermarkSpecs" : [ {
+              "rowtimeAttribute" : "rowtime",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "syntax" : "SPECIAL",
+                  "internalName" : "$-$1",
+                  "operands" : [ {
+                    "kind" : "INPUT_REF",
+                    "inputIndex" : 7,
+                    "type" : "TIMESTAMP(3)"
+                  }, {
+                    "kind" : "LITERAL",
+                    "value" : "1000",
+                    "type" : "INTERVAL SECOND(6) NOT NULL"
+                  } ],
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "`rowtime` - INTERVAL '1' SECOND"
+              }
+            } ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 6 ], [ 0 ] ],
+        "producedType" : "ROW<`name` VARCHAR(2147483647), `ts` 
VARCHAR(2147483647)> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`name` VARCHAR(2147483647), `ts` 
VARCHAR(2147483647)> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `ts` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t, project=[name, ts], metadata=[]]], fields=[name, 
ts])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 15,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "CALL",
+      "internalName" : "$TO_TIMESTAMP$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "VARCHAR(2147483647)"
+      } ],
+      "type" : "TIMESTAMP(3)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `rowtime` TIMESTAMP(3)>",
+    "description" : "Calc(select=[name, TO_TIMESTAMP(ts) AS rowtime])"
+  }, {
+    "id" : 16,
+    "type" : "stream-exec-watermark-assigner_1",
+    "watermarkExpr" : {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$-$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "TIMESTAMP(3)"
+      }, {
+        "kind" : "LITERAL",
+        "value" : "1000",
+        "type" : "INTERVAL SECOND(6) NOT NULL"
+      } ],
+      "type" : "TIMESTAMP(3)"
+    },
+    "rowtimeFieldIndex" : 1,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "name",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime 
- 1000:INTERVAL SECOND)])"
+  }, {
+    "id" : 17,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "name",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "Exchange(distribution=[hash[name]])"
+  }, {
+    "id" : 18,
+    "type" : "stream-exec-group-window-aggregate_1",
+    "configuration" : {
+      "table.exec.mini-batch.enabled" : "false",
+      "table.exec.mini-batch.size" : "-1",
+      "table.local-time-zone" : "default"
+    },
+    "grouping" : [ 0 ],
+    "aggCalls" : [ {
+      "name" : "EXPR$1",
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
+      "argList" : [ ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT NOT NULL"
+    } ],
+    "window" : {
+      "kind" : "SESSION",
+      "alias" : {
+        "name" : "w$",
+        "type" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      },
+      "timeField" : {
+        "fieldName" : "rowtime",
+        "fieldIndex" : 1,
+        "inputIndex" : 0,
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      },
+      "gap" : "PT3S"
+    },
+    "namedWindowProperties" : [ ],
+    "needRetraction" : false,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `EXPR$1` BIGINT NOT NULL>",
+    "description" : "GroupWindowAggregate(groupBy=[name], 
window=[SessionGroupWindow('w$, rowtime, 3000)], select=[name, COUNT(*) AS 
EXPR$1])"
+  }, {
+    "id" : 19,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "cnt",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `EXPR$1` BIGINT NOT NULL>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[name, EXPR$1])"
+  } ],
+  "edges" : [ {
+    "source" : 14,
+    "target" : 15,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 15,
+    "target" : 16,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 16,
+    "target" : 17,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 17,
+    "target" : 18,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 18,
+    "target" : 19,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-session-event-time/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-session-event-time/savepoint/_metadata
new file mode 100644
index 00000000000..b5d23f6cf79
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-session-event-time/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-session-proc-time/plan/group-window-aggregate-session-proc-time.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-session-proc-time/plan/group-window-aggregate-session-proc-time.json
new file mode 100644
index 00000000000..bec28ec65bd
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-session-proc-time/plan/group-window-aggregate-session-proc-time.json
@@ -0,0 +1,412 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 15,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "ts",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "a_int",
+              "dataType" : "INT"
+            }, {
+              "name" : "b_double",
+              "dataType" : "DOUBLE"
+            }, {
+              "name" : "c_float",
+              "dataType" : "FLOAT"
+            }, {
+              "name" : "d_bigdec",
+              "dataType" : "DECIMAL(10, 2)"
+            }, {
+              "name" : "comment",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "rowtime",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$TO_TIMESTAMP$1",
+                  "operands" : [ {
+                    "kind" : "INPUT_REF",
+                    "inputIndex" : 0,
+                    "type" : "VARCHAR(2147483647)"
+                  } ],
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "TO_TIMESTAMP(`ts`)"
+              }
+            }, {
+              "name" : "proctime",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$PROCTIME$1",
+                  "operands" : [ ],
+                  "type" : {
+                    "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+                    "nullable" : false,
+                    "precision" : 3,
+                    "kind" : "PROCTIME"
+                  }
+                },
+                "serializableString" : "PROCTIME()"
+              }
+            } ],
+            "watermarkSpecs" : [ {
+              "rowtimeAttribute" : "rowtime",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "syntax" : "SPECIAL",
+                  "internalName" : "$-$1",
+                  "operands" : [ {
+                    "kind" : "INPUT_REF",
+                    "inputIndex" : 7,
+                    "type" : "TIMESTAMP(3)"
+                  }, {
+                    "kind" : "LITERAL",
+                    "value" : "1000",
+                    "type" : "INTERVAL SECOND(6) NOT NULL"
+                  } ],
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "`rowtime` - INTERVAL '1' SECOND"
+              }
+            } ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 6 ], [ 0 ] ],
+        "producedType" : "ROW<`name` VARCHAR(2147483647), `ts` 
VARCHAR(2147483647)> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`name` VARCHAR(2147483647), `ts` 
VARCHAR(2147483647)> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `ts` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t, project=[name, ts], metadata=[]]], fields=[name, 
ts])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 16,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "CALL",
+      "internalName" : "$PROCTIME$1",
+      "operands" : [ ],
+      "type" : {
+        "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+        "nullable" : false,
+        "precision" : 3,
+        "kind" : "PROCTIME"
+      }
+    }, {
+      "kind" : "CALL",
+      "internalName" : "$TO_TIMESTAMP$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "VARCHAR(2147483647)"
+      } ],
+      "type" : "TIMESTAMP(3)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "name",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "proctime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      }, {
+        "name" : "rowtime",
+        "fieldType" : "TIMESTAMP(3)"
+      } ]
+    },
+    "description" : "Calc(select=[name, PROCTIME() AS proctime, 
TO_TIMESTAMP(ts) AS rowtime])"
+  }, {
+    "id" : 17,
+    "type" : "stream-exec-watermark-assigner_1",
+    "watermarkExpr" : {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$-$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 2,
+        "type" : "TIMESTAMP(3)"
+      }, {
+        "kind" : "LITERAL",
+        "value" : "1000",
+        "type" : "INTERVAL SECOND(6) NOT NULL"
+      } ],
+      "type" : "TIMESTAMP(3)"
+    },
+    "rowtimeFieldIndex" : 2,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "name",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "proctime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime 
- 1000:INTERVAL SECOND)])"
+  }, {
+    "id" : 18,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : {
+        "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+        "nullable" : false,
+        "precision" : 3,
+        "kind" : "PROCTIME"
+      }
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "name",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "proctime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      } ]
+    },
+    "description" : "Calc(select=[name, proctime])"
+  }, {
+    "id" : 19,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "name",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "proctime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      } ]
+    },
+    "description" : "Exchange(distribution=[hash[name]])"
+  }, {
+    "id" : 20,
+    "type" : "stream-exec-group-window-aggregate_1",
+    "configuration" : {
+      "table.exec.mini-batch.enabled" : "false",
+      "table.exec.mini-batch.size" : "-1",
+      "table.local-time-zone" : "default"
+    },
+    "grouping" : [ 0 ],
+    "aggCalls" : [ {
+      "name" : "EXPR$1",
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
+      "argList" : [ ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT NOT NULL"
+    } ],
+    "window" : {
+      "kind" : "SESSION",
+      "alias" : {
+        "name" : "w$",
+        "type" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      },
+      "timeField" : {
+        "fieldName" : "proctime",
+        "fieldIndex" : 1,
+        "inputIndex" : 0,
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      },
+      "gap" : "PT3S"
+    },
+    "namedWindowProperties" : [ ],
+    "needRetraction" : false,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `EXPR$1` BIGINT NOT NULL>",
+    "description" : "GroupWindowAggregate(groupBy=[name], 
window=[SessionGroupWindow('w$, proctime, 3000)], select=[name, COUNT(*) AS 
EXPR$1])"
+  }, {
+    "id" : 21,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "cnt",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `EXPR$1` BIGINT NOT NULL>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[name, EXPR$1])"
+  } ],
+  "edges" : [ {
+    "source" : 15,
+    "target" : 16,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 16,
+    "target" : 17,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 17,
+    "target" : 18,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 18,
+    "target" : 19,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 19,
+    "target" : 20,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 20,
+    "target" : 21,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-session-proc-time/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-session-proc-time/savepoint/_metadata
new file mode 100644
index 00000000000..2ab8f7a2d51
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-session-proc-time/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-tumble-event-time/plan/group-window-aggregate-tumble-event-time.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-tumble-event-time/plan/group-window-aggregate-tumble-event-time.json
new file mode 100644
index 00000000000..f9730f7e470
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-tumble-event-time/plan/group-window-aggregate-tumble-event-time.json
@@ -0,0 +1,511 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "ts",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "a_int",
+              "dataType" : "INT"
+            }, {
+              "name" : "b_double",
+              "dataType" : "DOUBLE"
+            }, {
+              "name" : "c_float",
+              "dataType" : "FLOAT"
+            }, {
+              "name" : "d_bigdec",
+              "dataType" : "DECIMAL(10, 2)"
+            }, {
+              "name" : "comment",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "rowtime",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$TO_TIMESTAMP$1",
+                  "operands" : [ {
+                    "kind" : "INPUT_REF",
+                    "inputIndex" : 0,
+                    "type" : "VARCHAR(2147483647)"
+                  } ],
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "TO_TIMESTAMP(`ts`)"
+              }
+            }, {
+              "name" : "proctime",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$PROCTIME$1",
+                  "operands" : [ ],
+                  "type" : {
+                    "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+                    "nullable" : false,
+                    "precision" : 3,
+                    "kind" : "PROCTIME"
+                  }
+                },
+                "serializableString" : "PROCTIME()"
+              }
+            } ],
+            "watermarkSpecs" : [ {
+              "rowtimeAttribute" : "rowtime",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "syntax" : "SPECIAL",
+                  "internalName" : "$-$1",
+                  "operands" : [ {
+                    "kind" : "INPUT_REF",
+                    "inputIndex" : 7,
+                    "type" : "TIMESTAMP(3)"
+                  }, {
+                    "kind" : "LITERAL",
+                    "value" : "1000",
+                    "type" : "INTERVAL SECOND(6) NOT NULL"
+                  } ],
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "`rowtime` - INTERVAL '1' SECOND"
+              }
+            } ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 6 ], [ 0 ], [ 1 ], [ 5 ] ],
+        "producedType" : "ROW<`name` VARCHAR(2147483647), `ts` 
VARCHAR(2147483647), `a_int` INT, `comment` VARCHAR(2147483647)> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`name` VARCHAR(2147483647), `ts` 
VARCHAR(2147483647), `a_int` INT, `comment` VARCHAR(2147483647)> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `ts` VARCHAR(2147483647), 
`a_int` INT, `comment` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t, project=[name, ts, a_int, comment], metadata=[]]], 
fields=[name, ts, a_int, comment])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "CALL",
+      "internalName" : "$TO_TIMESTAMP$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "VARCHAR(2147483647)"
+      } ],
+      "type" : "TIMESTAMP(3)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `rowtime` TIMESTAMP(3), 
`a_int` INT, `comment` VARCHAR(2147483647)>",
+    "description" : "Calc(select=[name, TO_TIMESTAMP(ts) AS rowtime, a_int, 
comment])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-watermark-assigner_1",
+    "watermarkExpr" : {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$-$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "TIMESTAMP(3)"
+      }, {
+        "kind" : "LITERAL",
+        "value" : "1000",
+        "type" : "INTERVAL SECOND(6) NOT NULL"
+      } ],
+      "type" : "TIMESTAMP(3)"
+    },
+    "rowtimeFieldIndex" : 1,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "name",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      }, {
+        "name" : "a_int",
+        "fieldType" : "INT"
+      }, {
+        "name" : "comment",
+        "fieldType" : "VARCHAR(2147483647)"
+      } ]
+    },
+    "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime 
- 1000:INTERVAL SECOND)])"
+  }, {
+    "id" : 4,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "name",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      }, {
+        "name" : "a_int",
+        "fieldType" : "INT"
+      }, {
+        "name" : "comment",
+        "fieldType" : "VARCHAR(2147483647)"
+      } ]
+    },
+    "description" : "Exchange(distribution=[hash[name]])"
+  }, {
+    "id" : 5,
+    "type" : "stream-exec-group-window-aggregate_1",
+    "configuration" : {
+      "table.exec.mini-batch.enabled" : "false",
+      "table.exec.mini-batch.size" : "-1",
+      "table.local-time-zone" : "default"
+    },
+    "grouping" : [ 0 ],
+    "aggCalls" : [ {
+      "name" : "EXPR$3",
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
+      "argList" : [ ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT NOT NULL"
+    }, {
+      "name" : "EXPR$4",
+      "internalName" : "$SUM$1",
+      "argList" : [ 2 ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "INT"
+    }, {
+      "name" : "EXPR$5",
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
+      "argList" : [ 3 ],
+      "filterArg" : -1,
+      "distinct" : true,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT NOT NULL"
+    } ],
+    "window" : {
+      "kind" : "TUMBLING",
+      "alias" : {
+        "name" : "w$",
+        "type" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      },
+      "timeField" : {
+        "fieldName" : "rowtime",
+        "fieldIndex" : 1,
+        "inputIndex" : 0,
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      },
+      "isTimeWindow" : true,
+      "size" : "PT5S"
+    },
+    "namedWindowProperties" : [ {
+      "name" : "w$start",
+      "property" : {
+        "kind" : "WindowStart",
+        "reference" : {
+          "name" : "w$",
+          "type" : {
+            "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+            "precision" : 3,
+            "kind" : "ROWTIME"
+          }
+        }
+      }
+    }, {
+      "name" : "w$end",
+      "property" : {
+        "kind" : "WindowEnd",
+        "reference" : {
+          "name" : "w$",
+          "type" : {
+            "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+            "precision" : 3,
+            "kind" : "ROWTIME"
+          }
+        }
+      }
+    }, {
+      "name" : "w$rowtime",
+      "property" : {
+        "kind" : "Rowtime",
+        "reference" : {
+          "name" : "w$",
+          "type" : {
+            "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+            "precision" : 3,
+            "kind" : "ROWTIME"
+          }
+        }
+      }
+    }, {
+      "name" : "w$proctime",
+      "property" : {
+        "kind" : "Proctime",
+        "reference" : {
+          "name" : "w$",
+          "type" : {
+            "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+            "precision" : 3,
+            "kind" : "ROWTIME"
+          }
+        }
+      }
+    } ],
+    "needRetraction" : false,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "name",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "EXPR$3",
+        "fieldType" : "BIGINT NOT NULL"
+      }, {
+        "name" : "EXPR$4",
+        "fieldType" : "INT"
+      }, {
+        "name" : "EXPR$5",
+        "fieldType" : "BIGINT NOT NULL"
+      }, {
+        "name" : "w$start",
+        "fieldType" : "TIMESTAMP(3) NOT NULL"
+      }, {
+        "name" : "w$end",
+        "fieldType" : "TIMESTAMP(3) NOT NULL"
+      }, {
+        "name" : "w$rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      }, {
+        "name" : "w$proctime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      } ]
+    },
+    "description" : "GroupWindowAggregate(groupBy=[name], 
window=[TumblingGroupWindow('w$, rowtime, 5000)], properties=[w$start, w$end, 
w$rowtime, w$proctime], select=[name, COUNT(*) AS EXPR$3, SUM(a_int) AS EXPR$4, 
COUNT(DISTINCT comment) AS EXPR$5, start('w$) AS w$start, end('w$) AS w$end, 
rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])"
+  }, {
+    "id" : 6,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 4,
+      "type" : "TIMESTAMP(3) NOT NULL"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 5,
+      "type" : "TIMESTAMP(3) NOT NULL"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "BIGINT NOT NULL"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "BIGINT NOT NULL"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `window_start` 
TIMESTAMP(3) NOT NULL, `window_end` TIMESTAMP(3) NOT NULL, `EXPR$3` BIGINT NOT 
NULL, `EXPR$4` INT, `EXPR$5` BIGINT NOT NULL>",
+    "description" : "Calc(select=[name, w$start AS window_start, w$end AS 
window_end, EXPR$3, EXPR$4, EXPR$5])"
+  }, {
+    "id" : 7,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "window_start",
+              "dataType" : "TIMESTAMP(3)"
+            }, {
+              "name" : "window_end",
+              "dataType" : "TIMESTAMP(3)"
+            }, {
+              "name" : "cnt",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "sum_int",
+              "dataType" : "INT"
+            }, {
+              "name" : "distinct_cnt",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputUpsertKey" : [ 0, 1 ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `window_start` 
TIMESTAMP(3) NOT NULL, `window_end` TIMESTAMP(3) NOT NULL, `EXPR$3` BIGINT NOT 
NULL, `EXPR$4` INT, `EXPR$5` BIGINT NOT NULL>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[name, window_start, window_end, EXPR$3, EXPR$4, EXPR$5])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 4,
+    "target" : 5,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 5,
+    "target" : 6,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 6,
+    "target" : 7,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-tumble-event-time/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-tumble-event-time/savepoint/_metadata
new file mode 100644
index 00000000000..4bb9e4d888e
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-tumble-event-time/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-tumble-proc-time/plan/group-window-aggregate-tumble-proc-time.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-tumble-proc-time/plan/group-window-aggregate-tumble-proc-time.json
new file mode 100644
index 00000000000..c6f5e4fdd01
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-tumble-proc-time/plan/group-window-aggregate-tumble-proc-time.json
@@ -0,0 +1,478 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "ts",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "a_int",
+              "dataType" : "INT"
+            }, {
+              "name" : "b_double",
+              "dataType" : "DOUBLE"
+            }, {
+              "name" : "c_float",
+              "dataType" : "FLOAT"
+            }, {
+              "name" : "d_bigdec",
+              "dataType" : "DECIMAL(10, 2)"
+            }, {
+              "name" : "comment",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "rowtime",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$TO_TIMESTAMP$1",
+                  "operands" : [ {
+                    "kind" : "INPUT_REF",
+                    "inputIndex" : 0,
+                    "type" : "VARCHAR(2147483647)"
+                  } ],
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "TO_TIMESTAMP(`ts`)"
+              }
+            }, {
+              "name" : "proctime",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$PROCTIME$1",
+                  "operands" : [ ],
+                  "type" : {
+                    "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+                    "nullable" : false,
+                    "precision" : 3,
+                    "kind" : "PROCTIME"
+                  }
+                },
+                "serializableString" : "PROCTIME()"
+              }
+            } ],
+            "watermarkSpecs" : [ {
+              "rowtimeAttribute" : "rowtime",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "syntax" : "SPECIAL",
+                  "internalName" : "$-$1",
+                  "operands" : [ {
+                    "kind" : "INPUT_REF",
+                    "inputIndex" : 7,
+                    "type" : "TIMESTAMP(3)"
+                  }, {
+                    "kind" : "LITERAL",
+                    "value" : "1000",
+                    "type" : "INTERVAL SECOND(6) NOT NULL"
+                  } ],
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "`rowtime` - INTERVAL '1' SECOND"
+              }
+            } ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 6 ], [ 1 ], [ 5 ], [ 0 ] ],
+        "producedType" : "ROW<`name` VARCHAR(2147483647), `a_int` INT, 
`comment` VARCHAR(2147483647), `ts` VARCHAR(2147483647)> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`name` VARCHAR(2147483647), `a_int` INT, 
`comment` VARCHAR(2147483647), `ts` VARCHAR(2147483647)> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `a_int` INT, `comment` 
VARCHAR(2147483647), `ts` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t, project=[name, a_int, comment, ts], metadata=[]]], 
fields=[name, a_int, comment, ts])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "CALL",
+      "internalName" : "$PROCTIME$1",
+      "operands" : [ ],
+      "type" : {
+        "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+        "nullable" : false,
+        "precision" : 3,
+        "kind" : "PROCTIME"
+      }
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "CALL",
+      "internalName" : "$TO_TIMESTAMP$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 3,
+        "type" : "VARCHAR(2147483647)"
+      } ],
+      "type" : "TIMESTAMP(3)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "name",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "proctime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      }, {
+        "name" : "a_int",
+        "fieldType" : "INT"
+      }, {
+        "name" : "comment",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : "TIMESTAMP(3)"
+      } ]
+    },
+    "description" : "Calc(select=[name, PROCTIME() AS proctime, a_int, 
comment, TO_TIMESTAMP(ts) AS rowtime])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-watermark-assigner_1",
+    "watermarkExpr" : {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$-$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 4,
+        "type" : "TIMESTAMP(3)"
+      }, {
+        "kind" : "LITERAL",
+        "value" : "1000",
+        "type" : "INTERVAL SECOND(6) NOT NULL"
+      } ],
+      "type" : "TIMESTAMP(3)"
+    },
+    "rowtimeFieldIndex" : 4,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "name",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "proctime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      }, {
+        "name" : "a_int",
+        "fieldType" : "INT"
+      }, {
+        "name" : "comment",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime 
- 1000:INTERVAL SECOND)])"
+  }, {
+    "id" : 4,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : {
+        "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+        "nullable" : false,
+        "precision" : 3,
+        "kind" : "PROCTIME"
+      }
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "name",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "proctime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      }, {
+        "name" : "a_int",
+        "fieldType" : "INT"
+      }, {
+        "name" : "comment",
+        "fieldType" : "VARCHAR(2147483647)"
+      } ]
+    },
+    "description" : "Calc(select=[name, proctime, a_int, comment])"
+  }, {
+    "id" : 5,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "name",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "proctime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      }, {
+        "name" : "a_int",
+        "fieldType" : "INT"
+      }, {
+        "name" : "comment",
+        "fieldType" : "VARCHAR(2147483647)"
+      } ]
+    },
+    "description" : "Exchange(distribution=[hash[name]])"
+  }, {
+    "id" : 6,
+    "type" : "stream-exec-group-window-aggregate_1",
+    "configuration" : {
+      "table.exec.mini-batch.enabled" : "false",
+      "table.exec.mini-batch.size" : "-1",
+      "table.local-time-zone" : "default"
+    },
+    "grouping" : [ 0 ],
+    "aggCalls" : [ {
+      "name" : "EXPR$1",
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
+      "argList" : [ ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT NOT NULL"
+    }, {
+      "name" : "EXPR$2",
+      "internalName" : "$SUM$1",
+      "argList" : [ 2 ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "INT"
+    }, {
+      "name" : "EXPR$3",
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
+      "argList" : [ 3 ],
+      "filterArg" : -1,
+      "distinct" : true,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT NOT NULL"
+    } ],
+    "window" : {
+      "kind" : "TUMBLING",
+      "alias" : {
+        "name" : "w$",
+        "type" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      },
+      "timeField" : {
+        "fieldName" : "proctime",
+        "fieldIndex" : 1,
+        "inputIndex" : 0,
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      },
+      "isTimeWindow" : true,
+      "size" : "PT5S"
+    },
+    "namedWindowProperties" : [ ],
+    "needRetraction" : false,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `EXPR$1` BIGINT NOT NULL, 
`EXPR$2` INT, `EXPR$3` BIGINT NOT NULL>",
+    "description" : "GroupWindowAggregate(groupBy=[name], 
window=[TumblingGroupWindow('w$, proctime, 5000)], select=[name, COUNT(*) AS 
EXPR$1, SUM(a_int) AS EXPR$2, COUNT(DISTINCT comment) AS EXPR$3])"
+  }, {
+    "id" : 7,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "cnt",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "sum_int",
+              "dataType" : "INT"
+            }, {
+              "name" : "distinct_cnt",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `EXPR$1` BIGINT NOT NULL, 
`EXPR$2` INT, `EXPR$3` BIGINT NOT NULL>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[name, EXPR$1, EXPR$2, EXPR$3])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 4,
+    "target" : 5,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 5,
+    "target" : 6,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 6,
+    "target" : 7,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-tumble-proc-time/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-tumble-proc-time/savepoint/_metadata
new file mode 100644
index 00000000000..0d56b1add79
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-window-aggregate_1/group-window-aggregate-tumble-proc-time/savepoint/_metadata
 differ

Reply via email to