This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 23629a80c574a9f998b41e258b8e656274714c9d Author: bvarghese1 <bvargh...@confluent.io> AuthorDate: Wed Dec 27 11:15:22 2023 -0800 [FLINK-33518] Implement restore tests for WatermarkAssigner node --- .../exec/stream/WatermarkAssignerRestoreTest.java | 40 +++ .../exec/stream/WatermarkAssignerTestPrograms.java | 134 ++++++++++ .../plan/watermark-assigner-basic-filter.json | 259 ++++++++++++++++++++ .../savepoint/_metadata | Bin 0 -> 9203 bytes .../plan/watermark-assigner-pushdown-metadata.json | 270 +++++++++++++++++++++ .../savepoint/_metadata | Bin 0 -> 8688 bytes 6 files changed, 703 insertions(+) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WatermarkAssignerRestoreTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WatermarkAssignerRestoreTest.java new file mode 100644 index 00000000000..8ea9f928801 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WatermarkAssignerRestoreTest.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase; +import org.apache.flink.table.test.program.TableTestProgram; + +import java.util.Arrays; +import java.util.List; + +/** Restore tests for {@link StreamExecWatermarkAssigner}. */ +public class WatermarkAssignerRestoreTest extends RestoreTestBase { + + public WatermarkAssignerRestoreTest() { + super(StreamExecWatermarkAssigner.class); + } + + @Override + public List<TableTestProgram> programs() { + return Arrays.asList( + WatermarkAssignerTestPrograms.WATERMARK_ASSIGNER_BASIC_FILTER, + WatermarkAssignerTestPrograms.WATERMARK_ASSIGNER_PUSHDOWN_METADATA); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WatermarkAssignerTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WatermarkAssignerTestPrograms.java new file mode 100644 index 00000000000..23f225c7d7b --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WatermarkAssignerTestPrograms.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.table.utils.DateTimeUtils; +import org.apache.flink.types.Row; + +/** {@link TableTestProgram} definitions for testing {@link StreamExecWindowRank}. */ +public class WatermarkAssignerTestPrograms { + + static final Row[] BEFORE_DATA = { + Row.of( + 2, + 2L, + "Hello", + "2020-04-15 08:00:00", + DateTimeUtils.toLocalDateTime(1586937600000L)), + Row.of(1, 1L, "Hi", "2020-04-15 08:00:01", DateTimeUtils.toLocalDateTime(1586937601000L)), + Row.of( + 3, + 2L, + "Hello world", + "2020-04-15 08:00:02", + DateTimeUtils.toLocalDateTime(1586937602000L)), + Row.of( + 4, + 3L, + "Hello world, how are you?", + "2020-04-15 08:00:03", + DateTimeUtils.toLocalDateTime(1586937603000L)), + Row.of( + 5, + 3L, + "I am fine.", + "2020-04-15 08:00:04", + DateTimeUtils.toLocalDateTime(1586937604000L)), + }; + + static final Row[] AFTER_DATA = { + Row.of(7, 4L, "Ack", "2020-04-15 08:00:21", DateTimeUtils.toLocalDateTime(1586937621000L)), + Row.of(6, 5L, "Syn", "2020-04-15 08:00:23", DateTimeUtils.toLocalDateTime(1586937623000L)), + Row.of( + 8, + 3L, + "Syn-Ack", + "2020-04-15 08:00:25", + DateTimeUtils.toLocalDateTime(1586937625000L)), + Row.of( + 10, + 3L, + "Close", + "2020-04-15 08:00:28", + DateTimeUtils.toLocalDateTime(1586937628000L)) + }; + + static final String[] SOURCE_SCHEMA = { + "a INT", + "b BIGINT", + "c VARCHAR", + "ts_string STRING", + "ts TIMESTAMP(3)", // row_time + "WATERMARK for ts AS ts - INTERVAL '1' SECOND" + }; + + static final String[] SINK_SCHEMA = {"a INT", "b BIGINT", "ts TIMESTAMP(3)"}; + + static final TableTestProgram WATERMARK_ASSIGNER_BASIC_FILTER = + TableTestProgram.of( + "watermark-assigner-basic-filter", + "validates watermark assigner with basic filtering") + .setupTableSource( + SourceTestStep.newBuilder("source_t") + .addSchema(SOURCE_SCHEMA) + .producedBeforeRestore(BEFORE_DATA) + .producedAfterRestore(AFTER_DATA) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema(SINK_SCHEMA) + .consumedBeforeRestore( + "+I[4, 3, 2020-04-15T08:00:03]", + "+I[5, 3, 2020-04-15T08:00:04]") + .consumedAfterRestore( + "+I[8, 3, 2020-04-15T08:00:25]", + "+I[10, 3, 2020-04-15T08:00:28]") + .build()) + .runSql("INSERT INTO sink_t SELECT a, b, ts FROM source_t WHERE b = 3") + .build(); + + static final TableTestProgram WATERMARK_ASSIGNER_PUSHDOWN_METADATA = + TableTestProgram.of( + "watermark-assigner-pushdown-metadata", + "validates watermark assigner with pushdown metadata") + .setupTableSource( + SourceTestStep.newBuilder("source_t") + .addOption("enable-watermark-push-down", "true") + .addOption("readable-metadata", "ts:timestamp(3)") + .addOption("disable-lookup", "true") + .addSchema(SOURCE_SCHEMA) + .producedBeforeRestore(BEFORE_DATA) + .producedAfterRestore(AFTER_DATA) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema(SINK_SCHEMA) + .consumedBeforeRestore( + "+I[4, 3, 2020-04-15T08:00:03]", + "+I[5, 3, 2020-04-15T08:00:04]") + .consumedAfterRestore( + "+I[8, 3, 2020-04-15T08:00:25]", + "+I[10, 3, 2020-04-15T08:00:28]") + .build()) + .runSql("INSERT INTO sink_t SELECT a, b, ts FROM source_t WHERE b = 3") + .build(); +} diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-watermark-assigner_1/watermark-assigner-basic-filter/plan/watermark-assigner-basic-filter.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-watermark-assigner_1/watermark-assigner-basic-filter/plan/watermark-assigner-basic-filter.json new file mode 100644 index 00000000000..cd752ea75ca --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-watermark-assigner_1/watermark-assigner-basic-filter/plan/watermark-assigner-basic-filter.json @@ -0,0 +1,259 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 1, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "INT" + }, { + "name" : "b", + "dataType" : "BIGINT" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "ts_string", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "ts", + "dataType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ], + "watermarkSpecs" : [ { + "rowtimeAttribute" : "ts", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$-$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "TIMESTAMP(3)" + }, { + "kind" : "LITERAL", + "value" : "1000", + "type" : "INTERVAL SECOND(6) NOT NULL" + } ], + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "`ts` - INTERVAL '1' SECOND" + } + } ] + }, + "partitionKeys" : [ ] + } + }, + "abilities" : [ { + "type" : "ProjectPushDown", + "projectedFields" : [ [ 0 ], [ 1 ], [ 4 ] ], + "producedType" : "ROW<`a` INT, `b` BIGINT, `ts` TIMESTAMP(3)> NOT NULL" + }, { + "type" : "ReadingMetadata", + "metadataKeys" : [ ], + "producedType" : "ROW<`a` INT, `b` BIGINT, `ts` TIMESTAMP(3)> NOT NULL" + } ] + }, + "outputType" : "ROW<`a` INT, `b` BIGINT, `ts` TIMESTAMP(3)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t, project=[a, b, ts], metadata=[]]], fields=[a, b, ts])", + "inputProperties" : [ ] + }, { + "id" : 2, + "type" : "stream-exec-watermark-assigner_1", + "watermarkExpr" : { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$-$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "TIMESTAMP(3)" + }, { + "kind" : "LITERAL", + "value" : "1000", + "type" : "INTERVAL SECOND(6) NOT NULL" + } ], + "type" : "TIMESTAMP(3)" + }, + "rowtimeFieldIndex" : 2, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "a", + "fieldType" : "INT" + }, { + "name" : "b", + "fieldType" : "BIGINT" + }, { + "name" : "ts", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])" + }, { + "id" : 3, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "INT" + }, { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$CAST$1", + "operands" : [ { + "kind" : "LITERAL", + "value" : 3, + "type" : "BIGINT NOT NULL" + } ], + "type" : "BIGINT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ], + "condition" : { + "kind" : "CALL", + "syntax" : "BINARY", + "internalName" : "$=$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "BIGINT" + }, { + "kind" : "LITERAL", + "value" : 3, + "type" : "BIGINT NOT NULL" + } ], + "type" : "BOOLEAN" + }, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "a", + "fieldType" : "INT" + }, { + "name" : "b", + "fieldType" : "BIGINT" + }, { + "name" : "ts", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "Calc(select=[a, CAST(3 AS BIGINT) AS b, ts], where=[(b = 3)])" + }, { + "id" : 4, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "INT" + }, { + "name" : "b", + "dataType" : "BIGINT" + }, { + "name" : "ts", + "dataType" : "TIMESTAMP(3)" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "a", + "fieldType" : "INT" + }, { + "name" : "b", + "fieldType" : "BIGINT" + }, { + "name" : "ts", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[a, b, ts])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 3, + "target" : 4, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-watermark-assigner_1/watermark-assigner-basic-filter/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-watermark-assigner_1/watermark-assigner-basic-filter/savepoint/_metadata new file mode 100644 index 00000000000..211e9cfeb48 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-watermark-assigner_1/watermark-assigner-basic-filter/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-watermark-assigner_1/watermark-assigner-pushdown-metadata/plan/watermark-assigner-pushdown-metadata.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-watermark-assigner_1/watermark-assigner-pushdown-metadata/plan/watermark-assigner-pushdown-metadata.json new file mode 100644 index 00000000000..9af37aabf93 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-watermark-assigner_1/watermark-assigner-pushdown-metadata/plan/watermark-assigner-pushdown-metadata.json @@ -0,0 +1,270 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 1, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "INT" + }, { + "name" : "b", + "dataType" : "BIGINT" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "ts_string", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "ts", + "dataType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ], + "watermarkSpecs" : [ { + "rowtimeAttribute" : "ts", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$-$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "TIMESTAMP(3)" + }, { + "kind" : "LITERAL", + "value" : "1000", + "type" : "INTERVAL SECOND(6) NOT NULL" + } ], + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "`ts` - INTERVAL '1' SECOND" + } + } ] + }, + "partitionKeys" : [ ] + } + }, + "abilities" : [ { + "type" : "ProjectPushDown", + "projectedFields" : [ [ 0 ], [ 1 ], [ 4 ] ], + "producedType" : "ROW<`a` INT, `b` BIGINT, `ts` TIMESTAMP(3)> NOT NULL" + }, { + "type" : "ReadingMetadata", + "metadataKeys" : [ ], + "producedType" : "ROW<`a` INT, `b` BIGINT, `ts` TIMESTAMP(3)> NOT NULL" + }, { + "type" : "WatermarkPushDown", + "watermarkExpr" : { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$-$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "TIMESTAMP(3)" + }, { + "kind" : "LITERAL", + "value" : "1000", + "type" : "INTERVAL SECOND(6) NOT NULL" + } ], + "type" : "TIMESTAMP(3)" + }, + "idleTimeoutMillis" : -1, + "producedType" : { + "type" : "ROW", + "nullable" : false, + "fields" : [ { + "name" : "a", + "fieldType" : "INT" + }, { + "name" : "b", + "fieldType" : "BIGINT" + }, { + "name" : "ts", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "watermarkParams" : { + "emitStrategy" : "ON_PERIODIC", + "alignGroupName" : null, + "alignMaxDrift" : "PT0S", + "alignUpdateInterval" : "PT1S", + "sourceIdleTimeout" : -1 + } + }, { + "type" : "FilterPushDown", + "predicates" : [ ] + } ] + }, + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "a", + "fieldType" : "INT" + }, { + "name" : "b", + "fieldType" : "BIGINT" + }, { + "name" : "ts", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t, project=[a, b, ts], metadata=[], watermark=[-(ts, 1000:INTERVAL SECOND)], watermarkEmitStrategy=[on-periodic], filter=[]]], fields=[a, b, ts])", + "inputProperties" : [ ] + }, { + "id" : 2, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "INT" + }, { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$CAST$1", + "operands" : [ { + "kind" : "LITERAL", + "value" : 3, + "type" : "BIGINT NOT NULL" + } ], + "type" : "BIGINT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ], + "condition" : { + "kind" : "CALL", + "syntax" : "BINARY", + "internalName" : "$=$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "BIGINT" + }, { + "kind" : "LITERAL", + "value" : 3, + "type" : "BIGINT NOT NULL" + } ], + "type" : "BOOLEAN" + }, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "a", + "fieldType" : "INT" + }, { + "name" : "b", + "fieldType" : "BIGINT" + }, { + "name" : "ts", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "Calc(select=[a, CAST(3 AS BIGINT) AS b, ts], where=[(b = 3)])" + }, { + "id" : 3, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "INT" + }, { + "name" : "b", + "dataType" : "BIGINT" + }, { + "name" : "ts", + "dataType" : "TIMESTAMP(3)" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "a", + "fieldType" : "INT" + }, { + "name" : "b", + "fieldType" : "BIGINT" + }, { + "name" : "ts", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[a, b, ts])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-watermark-assigner_1/watermark-assigner-pushdown-metadata/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-watermark-assigner_1/watermark-assigner-pushdown-metadata/savepoint/_metadata new file mode 100644 index 00000000000..f0214304c5d Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-watermark-assigner_1/watermark-assigner-pushdown-metadata/savepoint/_metadata differ