This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit c89933e99d5087f81389560663984012733d3bf8 Author: bvarghese1 <bvargh...@confluent.io> AuthorDate: Mon Jan 8 09:56:25 2024 -0800 [FLINK-33896] Implement restore tests for Correlate node --- .../nodes/exec/stream/CorrelateRestoreTest.java | 43 +++++ .../nodes/exec/stream/CorrelateTestPrograms.java | 174 +++++++++++++++++ .../plan/correlate-catalog-func.json | 145 ++++++++++++++ .../correlate-catalog-func/savepoint/_metadata | Bin 0 -> 7245 bytes .../plan/correlate-cross-join-unnest.json | 138 ++++++++++++++ .../savepoint/_metadata | Bin 0 -> 7091 bytes .../plan/correlate-join-filter.json | 212 +++++++++++++++++++++ .../correlate-join-filter/savepoint/_metadata | Bin 0 -> 7120 bytes .../plan/correlate-left-join.json | 141 ++++++++++++++ .../correlate-left-join/savepoint/_metadata | Bin 0 -> 7245 bytes .../plan/correlate-system-func.json | 145 ++++++++++++++ .../correlate-system-func/savepoint/_metadata | Bin 0 -> 7245 bytes 12 files changed, 998 insertions(+) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateRestoreTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateRestoreTest.java new file mode 100644 index 00000000000..cc24919cdca --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateRestoreTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase; +import org.apache.flink.table.test.program.TableTestProgram; + +import java.util.Arrays; +import java.util.List; + +/** Restore tests for {@link StreamExecCorrelate}. */ +public class CorrelateRestoreTest extends RestoreTestBase { + + public CorrelateRestoreTest() { + super(StreamExecCorrelate.class); + } + + @Override + public List<TableTestProgram> programs() { + return Arrays.asList( + CorrelateTestPrograms.CORRELATE_CATALOG_FUNC, + CorrelateTestPrograms.CORRELATE_SYSTEM_FUNC, + CorrelateTestPrograms.CORRELATE_JOIN_FILTER, + CorrelateTestPrograms.CORRELATE_LEFT_JOIN, + CorrelateTestPrograms.CORRELATE_CROSS_JOIN_UNNEST); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateTestPrograms.java new file mode 100644 index 00000000000..d1a2a1e46e3 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateTestPrograms.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions.StringSplit; +import org.apache.flink.table.planner.utils.TableFunc1; +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.types.Row; + +/** {@link TableTestProgram} definitions for testing {@link StreamExecCorrelate}. */ +public class CorrelateTestPrograms { + + static final Row[] BEFORE_DATA = {Row.of(1L, 1, "hi#there"), Row.of(2L, 2, "hello#world")}; + + static final Row[] AFTER_DATA = { + Row.of(4L, 4, "foo#bar"), Row.of(3L, 3, "bar#fiz"), + }; + + static final String[] SOURCE_SCHEMA = {"a BIGINT", "b INT NOT NULL", "c VARCHAR"}; + + static final TableTestProgram CORRELATE_CATALOG_FUNC = + TableTestProgram.of( + "correlate-catalog-func", + "validate correlate with temporary catalog function") + .setupTemporaryCatalogFunction("func1", TableFunc1.class) + .setupTableSource( + SourceTestStep.newBuilder("source_t") + .addSchema(SOURCE_SCHEMA) + .producedBeforeRestore(BEFORE_DATA) + .producedAfterRestore(AFTER_DATA) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema("a VARCHAR", "b VARCHAR") + .consumedBeforeRestore( + "+I[hi#there, $hi]", + "+I[hi#there, $there]", + "+I[hello#world, $hello]", + "+I[hello#world, $world]") + .consumedAfterRestore( + "+I[foo#bar, $foo]", + "+I[foo#bar, $bar]", + "+I[bar#fiz, $bar]", + "+I[bar#fiz, $fiz]") + .build()) + .runSql( + "INSERT INTO sink_t SELECT c, s FROM source_t, LATERAL TABLE(func1(c, '$')) AS T(s)") + .build(); + + static final TableTestProgram CORRELATE_SYSTEM_FUNC = + TableTestProgram.of( + "correlate-system-func", + "validate correlate with temporary system function") + .setupTemporarySystemFunction("STRING_SPLIT", StringSplit.class) + .setupTableSource( + SourceTestStep.newBuilder("source_t") + .addSchema(SOURCE_SCHEMA) + .producedBeforeRestore(BEFORE_DATA) + .producedAfterRestore(AFTER_DATA) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema("a VARCHAR", "b VARCHAR") + .consumedBeforeRestore( + "+I[hi#there, hi]", + "+I[hi#there, there]", + "+I[hello#world, hello]", + "+I[hello#world, world]") + .consumedAfterRestore( + "+I[foo#bar, foo]", + "+I[foo#bar, bar]", + "+I[bar#fiz, bar]", + "+I[bar#fiz, fiz]") + .build()) + .runSql( + "INSERT INTO sink_t SELECT c, s FROM source_t, LATERAL TABLE(STRING_SPLIT(c, '#')) AS T(s)") + .build(); + + static final TableTestProgram CORRELATE_JOIN_FILTER = + TableTestProgram.of("correlate-join-filter", "validate correlate with join and filter") + .setupTemporaryCatalogFunction("func1", TableFunc1.class) + .setupTableSource( + SourceTestStep.newBuilder("source_t") + .addSchema(SOURCE_SCHEMA) + .producedBeforeRestore(BEFORE_DATA) + .producedAfterRestore(AFTER_DATA) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema("a VARCHAR", "b VARCHAR") + .consumedBeforeRestore( + "+I[hello#world, hello]", "+I[hello#world, world]") + .consumedAfterRestore("+I[bar#fiz, bar]", "+I[bar#fiz, fiz]") + .build()) + .runSql( + "INSERT INTO sink_t SELECT * FROM (SELECT c, s FROM source_t, LATERAL TABLE(func1(c)) AS T(s)) AS T2 WHERE c LIKE '%hello%' OR c LIKE '%fiz%'") + .build(); + + static final TableTestProgram CORRELATE_LEFT_JOIN = + TableTestProgram.of("correlate-left-join", "validate correlate with left join") + .setupTemporaryCatalogFunction("func1", TableFunc1.class) + .setupTableSource( + SourceTestStep.newBuilder("source_t") + .addSchema(SOURCE_SCHEMA) + .producedBeforeRestore(BEFORE_DATA) + .producedAfterRestore(AFTER_DATA) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema("a VARCHAR", "b VARCHAR") + .consumedBeforeRestore( + "+I[hi#there, hi]", + "+I[hi#there, there]", + "+I[hello#world, hello]", + "+I[hello#world, world]") + .consumedAfterRestore( + "+I[foo#bar, foo]", + "+I[foo#bar, bar]", + "+I[bar#fiz, bar]", + "+I[bar#fiz, fiz]") + .build()) + .runSql( + "INSERT INTO sink_t SELECT c, s FROM source_t LEFT JOIN LATERAL TABLE(func1(c)) AS T(s) ON TRUE") + .build(); + + static final TableTestProgram CORRELATE_CROSS_JOIN_UNNEST = + TableTestProgram.of( + "correlate-cross-join-unnest", + "validate correlate with cross join and unnest") + .setupTableSource( + SourceTestStep.newBuilder("source_t") + .addSchema("name STRING", "arr ARRAY<ROW<nested STRING>>") + .producedBeforeRestore( + Row.of( + "Bob", + new Row[] { + Row.of("1"), Row.of("2"), Row.of("3") + })) + .producedAfterRestore( + Row.of( + "Alice", + new Row[] { + Row.of("4"), Row.of("5"), Row.of("6") + })) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema("name STRING", "nested STRING") + .consumedBeforeRestore("+I[Bob, 1]", "+I[Bob, 2]", "+I[Bob, 3]") + .consumedAfterRestore( + "+I[Alice, 4]", "+I[Alice, 5]", "+I[Alice, 6]") + .build()) + .runSql( + "INSERT INTO sink_t SELECT name, nested FROM source_t CROSS JOIN UNNEST(arr) AS T(nested)") + .build(); +} diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-catalog-func/plan/correlate-catalog-func.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-catalog-func/plan/correlate-catalog-func.json new file mode 100644 index 00000000000..ad40b066e4f --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-catalog-func/plan/correlate-catalog-func.json @@ -0,0 +1,145 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 1, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "BIGINT" + }, { + "name" : "b", + "dataType" : "INT NOT NULL" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[a, b, c])", + "inputProperties" : [ ] + }, { + "id" : 2, + "type" : "stream-exec-correlate_1", + "joinType" : "INNER", + "functionCall" : { + "kind" : "CALL", + "catalogName" : "`default_catalog`.`default_database`.`func1`", + "operands" : [ { + "kind" : "FIELD_ACCESS", + "name" : "c", + "expr" : { + "kind" : "CORREL_VARIABLE", + "correl" : "$cor0", + "type" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), `s` VARCHAR(2147483647)> NOT NULL" + } + }, { + "kind" : "LITERAL", + "value" : "$", + "type" : "CHAR(1) NOT NULL" + } ], + "type" : "ROW<`EXPR$0` VARCHAR(2147483647)> NOT NULL" + }, + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), `EXPR$0` VARCHAR(2147483647)>", + "description" : "Correlate(invocation=[func1($cor0.c, _UTF-16LE'$')], correlate=[table(func1($cor0.c,'$'))], select=[a,b,c,EXPR$0], rowType=[RecordType(BIGINT a, INTEGER b, VARCHAR(2147483647) c, VARCHAR(2147483647) EXPR$0)], joinType=[INNER])" + }, { + "id" : 3, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "VARCHAR(2147483647)" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`c` VARCHAR(2147483647), `s` VARCHAR(2147483647)>", + "description" : "Calc(select=[c, EXPR$0 AS s])" + }, { + "id" : 4, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "b", + "dataType" : "VARCHAR(2147483647)" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`c` VARCHAR(2147483647), `s` VARCHAR(2147483647)>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[c, s])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 3, + "target" : 4, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-catalog-func/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-catalog-func/savepoint/_metadata new file mode 100644 index 00000000000..7866f20d147 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-catalog-func/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-cross-join-unnest/plan/correlate-cross-join-unnest.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-cross-join-unnest/plan/correlate-cross-join-unnest.json new file mode 100644 index 00000000000..a72848c4b2e --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-cross-join-unnest/plan/correlate-cross-join-unnest.json @@ -0,0 +1,138 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 18, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "name", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "arr", + "dataType" : "ARRAY<ROW<`nested` VARCHAR(2147483647)>>" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "outputType" : "ROW<`name` VARCHAR(2147483647), `arr` ARRAY<ROW<`nested` VARCHAR(2147483647)>>>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[name, arr])", + "inputProperties" : [ ] + }, { + "id" : 19, + "type" : "stream-exec-correlate_1", + "joinType" : "INNER", + "functionCall" : { + "kind" : "CALL", + "internalName" : "$UNNEST_ROWS$1", + "operands" : [ { + "kind" : "FIELD_ACCESS", + "name" : "arr", + "expr" : { + "kind" : "CORREL_VARIABLE", + "correl" : "$cor0", + "type" : "ROW<`name` VARCHAR(2147483647), `arr` ARRAY<ROW<`nested` VARCHAR(2147483647)>>, `nested` VARCHAR(2147483647)> NOT NULL" + } + } ], + "type" : "ROW<`nested` VARCHAR(2147483647)>" + }, + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`name` VARCHAR(2147483647), `arr` ARRAY<ROW<`nested` VARCHAR(2147483647)>>, `nested` VARCHAR(2147483647)>", + "description" : "Correlate(invocation=[$UNNEST_ROWS$1($cor0.arr)], correlate=[table($UNNEST_ROWS$1($cor0.arr))], select=[name,arr,nested], rowType=[RecordType(VARCHAR(2147483647) name, RecordType:peek_no_expand(VARCHAR(2147483647) nested) ARRAY arr, VARCHAR(2147483647) nested)], joinType=[INNER])" + }, { + "id" : 20, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`name` VARCHAR(2147483647), `nested` VARCHAR(2147483647)>", + "description" : "Calc(select=[name, nested])" + }, { + "id" : 21, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "name", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "nested", + "dataType" : "VARCHAR(2147483647)" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`name` VARCHAR(2147483647), `nested` VARCHAR(2147483647)>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[name, nested])" + } ], + "edges" : [ { + "source" : 18, + "target" : 19, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 19, + "target" : 20, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 20, + "target" : 21, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-cross-join-unnest/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-cross-join-unnest/savepoint/_metadata new file mode 100644 index 00000000000..a83a1f37ab6 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-cross-join-unnest/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-join-filter/plan/correlate-join-filter.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-join-filter/plan/correlate-join-filter.json new file mode 100644 index 00000000000..08643e4e9e9 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-join-filter/plan/correlate-join-filter.json @@ -0,0 +1,212 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 9, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "BIGINT" + }, { + "name" : "b", + "dataType" : "INT NOT NULL" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + }, + "abilities" : [ { + "type" : "FilterPushDown", + "predicates" : [ ] + } ] + }, + "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t, filter=[]]], fields=[a, b, c])", + "inputProperties" : [ ] + }, { + "id" : 10, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "BIGINT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "INT NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + } ], + "condition" : { + "kind" : "CALL", + "syntax" : "BINARY", + "internalName" : "$OR$1", + "operands" : [ { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$LIKE$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "LITERAL", + "value" : "%hello%", + "type" : "CHAR(7) NOT NULL" + } ], + "type" : "BOOLEAN" + }, { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$LIKE$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "LITERAL", + "value" : "%fiz%", + "type" : "CHAR(5) NOT NULL" + } ], + "type" : "BOOLEAN" + } ], + "type" : "BOOLEAN" + }, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647)>", + "description" : "Calc(select=[a, b, c], where=[(LIKE(c, '%hello%') OR LIKE(c, '%fiz%'))])" + }, { + "id" : 11, + "type" : "stream-exec-correlate_1", + "joinType" : "INNER", + "functionCall" : { + "kind" : "CALL", + "catalogName" : "`default_catalog`.`default_database`.`func1`", + "operands" : [ { + "kind" : "FIELD_ACCESS", + "name" : "c", + "expr" : { + "kind" : "CORREL_VARIABLE", + "correl" : "$cor0", + "type" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), `s` VARCHAR(2147483647)> NOT NULL" + } + } ], + "type" : "ROW<`EXPR$0` VARCHAR(2147483647)> NOT NULL" + }, + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), `EXPR$0` VARCHAR(2147483647)>", + "description" : "Correlate(invocation=[func1($cor0.c)], correlate=[table(func1($cor0.c))], select=[a,b,c,EXPR$0], rowType=[RecordType(BIGINT a, INTEGER b, VARCHAR(2147483647) c, VARCHAR(2147483647) EXPR$0)], joinType=[INNER])" + }, { + "id" : 12, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "VARCHAR(2147483647)" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`c` VARCHAR(2147483647), `s` VARCHAR(2147483647)>", + "description" : "Calc(select=[c, EXPR$0 AS s])" + }, { + "id" : 13, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "b", + "dataType" : "VARCHAR(2147483647)" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`c` VARCHAR(2147483647), `s` VARCHAR(2147483647)>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[c, s])" + } ], + "edges" : [ { + "source" : 9, + "target" : 10, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 10, + "target" : 11, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 11, + "target" : 12, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 12, + "target" : 13, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-join-filter/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-join-filter/savepoint/_metadata new file mode 100644 index 00000000000..225619c71e8 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-join-filter/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-left-join/plan/correlate-left-join.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-left-join/plan/correlate-left-join.json new file mode 100644 index 00000000000..c70d6ef7fb7 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-left-join/plan/correlate-left-join.json @@ -0,0 +1,141 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 14, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "BIGINT" + }, { + "name" : "b", + "dataType" : "INT NOT NULL" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[a, b, c])", + "inputProperties" : [ ] + }, { + "id" : 15, + "type" : "stream-exec-correlate_1", + "joinType" : "LEFT", + "functionCall" : { + "kind" : "CALL", + "catalogName" : "`default_catalog`.`default_database`.`func1`", + "operands" : [ { + "kind" : "FIELD_ACCESS", + "name" : "c", + "expr" : { + "kind" : "CORREL_VARIABLE", + "correl" : "$cor0", + "type" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), `s` VARCHAR(2147483647)> NOT NULL" + } + } ], + "type" : "ROW<`EXPR$0` VARCHAR(2147483647)> NOT NULL" + }, + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), `EXPR$0` VARCHAR(2147483647)>", + "description" : "Correlate(invocation=[func1($cor0.c)], correlate=[table(func1($cor0.c))], select=[a,b,c,EXPR$0], rowType=[RecordType(BIGINT a, INTEGER b, VARCHAR(2147483647) c, VARCHAR(2147483647) EXPR$0)], joinType=[LEFT])" + }, { + "id" : 16, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "VARCHAR(2147483647)" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`c` VARCHAR(2147483647), `s` VARCHAR(2147483647)>", + "description" : "Calc(select=[c, EXPR$0 AS s])" + }, { + "id" : 17, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "b", + "dataType" : "VARCHAR(2147483647)" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`c` VARCHAR(2147483647), `s` VARCHAR(2147483647)>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[c, s])" + } ], + "edges" : [ { + "source" : 14, + "target" : 15, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 15, + "target" : 16, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 16, + "target" : 17, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-left-join/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-left-join/savepoint/_metadata new file mode 100644 index 00000000000..b5330ece401 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-left-join/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-system-func/plan/correlate-system-func.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-system-func/plan/correlate-system-func.json new file mode 100644 index 00000000000..b05d39229c3 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-system-func/plan/correlate-system-func.json @@ -0,0 +1,145 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 5, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "BIGINT" + }, { + "name" : "b", + "dataType" : "INT NOT NULL" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[a, b, c])", + "inputProperties" : [ ] + }, { + "id" : 6, + "type" : "stream-exec-correlate_1", + "joinType" : "INNER", + "functionCall" : { + "kind" : "CALL", + "systemName" : "STRING_SPLIT", + "operands" : [ { + "kind" : "FIELD_ACCESS", + "name" : "c", + "expr" : { + "kind" : "CORREL_VARIABLE", + "correl" : "$cor0", + "type" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), `s` VARCHAR(2147483647)> NOT NULL" + } + }, { + "kind" : "LITERAL", + "value" : "#", + "type" : "CHAR(1) NOT NULL" + } ], + "type" : "ROW<`EXPR$0` VARCHAR(2147483647)> NOT NULL" + }, + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), `EXPR$0` VARCHAR(2147483647)>", + "description" : "Correlate(invocation=[STRING_SPLIT($cor0.c, _UTF-16LE'#')], correlate=[table(STRING_SPLIT($cor0.c,'#'))], select=[a,b,c,EXPR$0], rowType=[RecordType(BIGINT a, INTEGER b, VARCHAR(2147483647) c, VARCHAR(2147483647) EXPR$0)], joinType=[INNER])" + }, { + "id" : 7, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "VARCHAR(2147483647)" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`c` VARCHAR(2147483647), `s` VARCHAR(2147483647)>", + "description" : "Calc(select=[c, EXPR$0 AS s])" + }, { + "id" : 8, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "b", + "dataType" : "VARCHAR(2147483647)" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`c` VARCHAR(2147483647), `s` VARCHAR(2147483647)>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[c, s])" + } ], + "edges" : [ { + "source" : 5, + "target" : 6, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 6, + "target" : 7, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 7, + "target" : 8, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-system-func/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-system-func/savepoint/_metadata new file mode 100644 index 00000000000..bf0b0864597 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-system-func/savepoint/_metadata differ