This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 23629a80c574a9f998b41e258b8e656274714c9d
Author: bvarghese1 <bvargh...@confluent.io>
AuthorDate: Wed Dec 27 11:15:22 2023 -0800

    [FLINK-33518] Implement restore tests for WatermarkAssigner node
---
 .../exec/stream/WatermarkAssignerRestoreTest.java  |  40 +++
 .../exec/stream/WatermarkAssignerTestPrograms.java | 134 ++++++++++
 .../plan/watermark-assigner-basic-filter.json      | 259 ++++++++++++++++++++
 .../savepoint/_metadata                            | Bin 0 -> 9203 bytes
 .../plan/watermark-assigner-pushdown-metadata.json | 270 +++++++++++++++++++++
 .../savepoint/_metadata                            | Bin 0 -> 8688 bytes
 6 files changed, 703 insertions(+)

diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WatermarkAssignerRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WatermarkAssignerRestoreTest.java
new file mode 100644
index 00000000000..8ea9f928801
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WatermarkAssignerRestoreTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
+import org.apache.flink.table.test.program.TableTestProgram;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** Restore tests for {@link StreamExecWatermarkAssigner}. */
+public class WatermarkAssignerRestoreTest extends RestoreTestBase {
+
+    public WatermarkAssignerRestoreTest() {
+        super(StreamExecWatermarkAssigner.class);
+    }
+
+    @Override
+    public List<TableTestProgram> programs() {
+        return Arrays.asList(
+                WatermarkAssignerTestPrograms.WATERMARK_ASSIGNER_BASIC_FILTER,
+                
WatermarkAssignerTestPrograms.WATERMARK_ASSIGNER_PUSHDOWN_METADATA);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WatermarkAssignerTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WatermarkAssignerTestPrograms.java
new file mode 100644
index 00000000000..23f225c7d7b
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WatermarkAssignerTestPrograms.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.table.utils.DateTimeUtils;
+import org.apache.flink.types.Row;
+
+/** {@link TableTestProgram} definitions for testing {@link 
StreamExecWindowRank}. */
+public class WatermarkAssignerTestPrograms {
+
+    static final Row[] BEFORE_DATA = {
+        Row.of(
+                2,
+                2L,
+                "Hello",
+                "2020-04-15 08:00:00",
+                DateTimeUtils.toLocalDateTime(1586937600000L)),
+        Row.of(1, 1L, "Hi", "2020-04-15 08:00:01", 
DateTimeUtils.toLocalDateTime(1586937601000L)),
+        Row.of(
+                3,
+                2L,
+                "Hello world",
+                "2020-04-15 08:00:02",
+                DateTimeUtils.toLocalDateTime(1586937602000L)),
+        Row.of(
+                4,
+                3L,
+                "Hello world, how are you?",
+                "2020-04-15 08:00:03",
+                DateTimeUtils.toLocalDateTime(1586937603000L)),
+        Row.of(
+                5,
+                3L,
+                "I am fine.",
+                "2020-04-15 08:00:04",
+                DateTimeUtils.toLocalDateTime(1586937604000L)),
+    };
+
+    static final Row[] AFTER_DATA = {
+        Row.of(7, 4L, "Ack", "2020-04-15 08:00:21", 
DateTimeUtils.toLocalDateTime(1586937621000L)),
+        Row.of(6, 5L, "Syn", "2020-04-15 08:00:23", 
DateTimeUtils.toLocalDateTime(1586937623000L)),
+        Row.of(
+                8,
+                3L,
+                "Syn-Ack",
+                "2020-04-15 08:00:25",
+                DateTimeUtils.toLocalDateTime(1586937625000L)),
+        Row.of(
+                10,
+                3L,
+                "Close",
+                "2020-04-15 08:00:28",
+                DateTimeUtils.toLocalDateTime(1586937628000L))
+    };
+
+    static final String[] SOURCE_SCHEMA = {
+        "a INT",
+        "b BIGINT",
+        "c VARCHAR",
+        "ts_string STRING",
+        "ts TIMESTAMP(3)", // row_time
+        "WATERMARK for ts AS ts - INTERVAL '1' SECOND"
+    };
+
+    static final String[] SINK_SCHEMA = {"a INT", "b BIGINT", "ts 
TIMESTAMP(3)"};
+
+    static final TableTestProgram WATERMARK_ASSIGNER_BASIC_FILTER =
+            TableTestProgram.of(
+                            "watermark-assigner-basic-filter",
+                            "validates watermark assigner with basic 
filtering")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addSchema(SOURCE_SCHEMA)
+                                    .producedBeforeRestore(BEFORE_DATA)
+                                    .producedAfterRestore(AFTER_DATA)
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(SINK_SCHEMA)
+                                    .consumedBeforeRestore(
+                                            "+I[4, 3, 2020-04-15T08:00:03]",
+                                            "+I[5, 3, 2020-04-15T08:00:04]")
+                                    .consumedAfterRestore(
+                                            "+I[8, 3, 2020-04-15T08:00:25]",
+                                            "+I[10, 3, 2020-04-15T08:00:28]")
+                                    .build())
+                    .runSql("INSERT INTO sink_t SELECT a, b, ts FROM source_t 
WHERE b = 3")
+                    .build();
+
+    static final TableTestProgram WATERMARK_ASSIGNER_PUSHDOWN_METADATA =
+            TableTestProgram.of(
+                            "watermark-assigner-pushdown-metadata",
+                            "validates watermark assigner with pushdown 
metadata")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addOption("enable-watermark-push-down", 
"true")
+                                    .addOption("readable-metadata", 
"ts:timestamp(3)")
+                                    .addOption("disable-lookup", "true")
+                                    .addSchema(SOURCE_SCHEMA)
+                                    .producedBeforeRestore(BEFORE_DATA)
+                                    .producedAfterRestore(AFTER_DATA)
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(SINK_SCHEMA)
+                                    .consumedBeforeRestore(
+                                            "+I[4, 3, 2020-04-15T08:00:03]",
+                                            "+I[5, 3, 2020-04-15T08:00:04]")
+                                    .consumedAfterRestore(
+                                            "+I[8, 3, 2020-04-15T08:00:25]",
+                                            "+I[10, 3, 2020-04-15T08:00:28]")
+                                    .build())
+                    .runSql("INSERT INTO sink_t SELECT a, b, ts FROM source_t 
WHERE b = 3")
+                    .build();
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-watermark-assigner_1/watermark-assigner-basic-filter/plan/watermark-assigner-basic-filter.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-watermark-assigner_1/watermark-assigner-basic-filter/plan/watermark-assigner-basic-filter.json
new file mode 100644
index 00000000000..cd752ea75ca
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-watermark-assigner_1/watermark-assigner-basic-filter/plan/watermark-assigner-basic-filter.json
@@ -0,0 +1,259 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "ts_string",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "ts",
+              "dataType" : {
+                "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+                "precision" : 3,
+                "kind" : "ROWTIME"
+              }
+            } ],
+            "watermarkSpecs" : [ {
+              "rowtimeAttribute" : "ts",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "syntax" : "SPECIAL",
+                  "internalName" : "$-$1",
+                  "operands" : [ {
+                    "kind" : "INPUT_REF",
+                    "inputIndex" : 4,
+                    "type" : "TIMESTAMP(3)"
+                  }, {
+                    "kind" : "LITERAL",
+                    "value" : "1000",
+                    "type" : "INTERVAL SECOND(6) NOT NULL"
+                  } ],
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "`ts` - INTERVAL '1' SECOND"
+              }
+            } ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ], [ 1 ], [ 4 ] ],
+        "producedType" : "ROW<`a` INT, `b` BIGINT, `ts` TIMESTAMP(3)> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`a` INT, `b` BIGINT, `ts` TIMESTAMP(3)> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `ts` TIMESTAMP(3)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t, project=[a, b, ts], metadata=[]]], fields=[a, b, 
ts])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-watermark-assigner_1",
+    "watermarkExpr" : {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$-$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 2,
+        "type" : "TIMESTAMP(3)"
+      }, {
+        "kind" : "LITERAL",
+        "value" : "1000",
+        "type" : "INTERVAL SECOND(6) NOT NULL"
+      } ],
+      "type" : "TIMESTAMP(3)"
+    },
+    "rowtimeFieldIndex" : 2,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "a",
+        "fieldType" : "INT"
+      }, {
+        "name" : "b",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "ts",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "WatermarkAssigner(rowtime=[ts], watermark=[(ts - 
1000:INTERVAL SECOND)])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$CAST$1",
+      "operands" : [ {
+        "kind" : "LITERAL",
+        "value" : 3,
+        "type" : "BIGINT NOT NULL"
+      } ],
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : {
+        "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+        "precision" : 3,
+        "kind" : "ROWTIME"
+      }
+    } ],
+    "condition" : {
+      "kind" : "CALL",
+      "syntax" : "BINARY",
+      "internalName" : "$=$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "BIGINT"
+      }, {
+        "kind" : "LITERAL",
+        "value" : 3,
+        "type" : "BIGINT NOT NULL"
+      } ],
+      "type" : "BOOLEAN"
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "a",
+        "fieldType" : "INT"
+      }, {
+        "name" : "b",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "ts",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "Calc(select=[a, CAST(3 AS BIGINT) AS b, ts], where=[(b = 
3)])"
+  }, {
+    "id" : 4,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "ts",
+              "dataType" : "TIMESTAMP(3)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "a",
+        "fieldType" : "INT"
+      }, {
+        "name" : "b",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "ts",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, b, ts])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-watermark-assigner_1/watermark-assigner-basic-filter/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-watermark-assigner_1/watermark-assigner-basic-filter/savepoint/_metadata
new file mode 100644
index 00000000000..211e9cfeb48
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-watermark-assigner_1/watermark-assigner-basic-filter/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-watermark-assigner_1/watermark-assigner-pushdown-metadata/plan/watermark-assigner-pushdown-metadata.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-watermark-assigner_1/watermark-assigner-pushdown-metadata/plan/watermark-assigner-pushdown-metadata.json
new file mode 100644
index 00000000000..9af37aabf93
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-watermark-assigner_1/watermark-assigner-pushdown-metadata/plan/watermark-assigner-pushdown-metadata.json
@@ -0,0 +1,270 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "ts_string",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "ts",
+              "dataType" : {
+                "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+                "precision" : 3,
+                "kind" : "ROWTIME"
+              }
+            } ],
+            "watermarkSpecs" : [ {
+              "rowtimeAttribute" : "ts",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "syntax" : "SPECIAL",
+                  "internalName" : "$-$1",
+                  "operands" : [ {
+                    "kind" : "INPUT_REF",
+                    "inputIndex" : 4,
+                    "type" : "TIMESTAMP(3)"
+                  }, {
+                    "kind" : "LITERAL",
+                    "value" : "1000",
+                    "type" : "INTERVAL SECOND(6) NOT NULL"
+                  } ],
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "`ts` - INTERVAL '1' SECOND"
+              }
+            } ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ], [ 1 ], [ 4 ] ],
+        "producedType" : "ROW<`a` INT, `b` BIGINT, `ts` TIMESTAMP(3)> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`a` INT, `b` BIGINT, `ts` TIMESTAMP(3)> NOT NULL"
+      }, {
+        "type" : "WatermarkPushDown",
+        "watermarkExpr" : {
+          "kind" : "CALL",
+          "syntax" : "SPECIAL",
+          "internalName" : "$-$1",
+          "operands" : [ {
+            "kind" : "INPUT_REF",
+            "inputIndex" : 2,
+            "type" : "TIMESTAMP(3)"
+          }, {
+            "kind" : "LITERAL",
+            "value" : "1000",
+            "type" : "INTERVAL SECOND(6) NOT NULL"
+          } ],
+          "type" : "TIMESTAMP(3)"
+        },
+        "idleTimeoutMillis" : -1,
+        "producedType" : {
+          "type" : "ROW",
+          "nullable" : false,
+          "fields" : [ {
+            "name" : "a",
+            "fieldType" : "INT"
+          }, {
+            "name" : "b",
+            "fieldType" : "BIGINT"
+          }, {
+            "name" : "ts",
+            "fieldType" : {
+              "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+              "precision" : 3,
+              "kind" : "ROWTIME"
+            }
+          } ]
+        },
+        "watermarkParams" : {
+          "emitStrategy" : "ON_PERIODIC",
+          "alignGroupName" : null,
+          "alignMaxDrift" : "PT0S",
+          "alignUpdateInterval" : "PT1S",
+          "sourceIdleTimeout" : -1
+        }
+      }, {
+        "type" : "FilterPushDown",
+        "predicates" : [ ]
+      } ]
+    },
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "a",
+        "fieldType" : "INT"
+      }, {
+        "name" : "b",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "ts",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t, project=[a, b, ts], metadata=[], watermark=[-(ts, 
1000:INTERVAL SECOND)], watermarkEmitStrategy=[on-periodic], filter=[]]], 
fields=[a, b, ts])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$CAST$1",
+      "operands" : [ {
+        "kind" : "LITERAL",
+        "value" : 3,
+        "type" : "BIGINT NOT NULL"
+      } ],
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : {
+        "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+        "precision" : 3,
+        "kind" : "ROWTIME"
+      }
+    } ],
+    "condition" : {
+      "kind" : "CALL",
+      "syntax" : "BINARY",
+      "internalName" : "$=$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "BIGINT"
+      }, {
+        "kind" : "LITERAL",
+        "value" : 3,
+        "type" : "BIGINT NOT NULL"
+      } ],
+      "type" : "BOOLEAN"
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "a",
+        "fieldType" : "INT"
+      }, {
+        "name" : "b",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "ts",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "Calc(select=[a, CAST(3 AS BIGINT) AS b, ts], where=[(b = 
3)])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "ts",
+              "dataType" : "TIMESTAMP(3)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "a",
+        "fieldType" : "INT"
+      }, {
+        "name" : "b",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "ts",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, b, ts])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-watermark-assigner_1/watermark-assigner-pushdown-metadata/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-watermark-assigner_1/watermark-assigner-pushdown-metadata/savepoint/_metadata
new file mode 100644
index 00000000000..f0214304c5d
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-watermark-assigner_1/watermark-assigner-pushdown-metadata/savepoint/_metadata
 differ

Reply via email to