This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6e93394b4f2c22e5c50858242c17bcbd8fcf45c3
Author: bvarghese1 <bvargh...@confluent.io>
AuthorDate: Fri Jan 26 17:44:14 2024 -0800

    [FLINK-34248] Remove ChangelogNormalize Json Plan & IT tests
---
 .../exec/stream/ChangelogSourceJsonPlanTest.java   |  96 ------------
 .../jsonplan/ChangelogSourceJsonPlanITCase.java    | 113 -------------
 .../testChangelogSource.out                        | 174 ---------------------
 .../testUpsertSource.out                           | 155 ------------------
 4 files changed, 538 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogSourceJsonPlanTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogSourceJsonPlanTest.java
deleted file mode 100644
index 0e35bc11bca..00000000000
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogSourceJsonPlanTest.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
-
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
-import org.apache.flink.table.planner.utils.StreamTableTestUtil;
-import org.apache.flink.table.planner.utils.TableTestBase;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-/** Test json serialization/deserialization for changelog source, including 
upsert source. */
-class ChangelogSourceJsonPlanTest extends TableTestBase {
-
-    private StreamTableTestUtil util;
-    private TableEnvironment tEnv;
-
-    @BeforeEach
-    void setup() {
-        util = streamTestUtil(TableConfig.getDefault());
-        tEnv = util.getTableEnv();
-        
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE,
 true);
-    }
-
-    @Test
-    void testChangelogSource() {
-        String srcTableDdl =
-                "CREATE TABLE MyTable (\n"
-                        + "  a bigint,\n"
-                        + "  b int not null,\n"
-                        + "  c varchar,\n"
-                        + "  d timestamp(3),\n"
-                        + "  PRIMARY KEY (a, b) NOT ENFORCED\n"
-                        + ") with (\n"
-                        + "  'connector' = 'values',\n"
-                        + "  'changelog-mode' = 'I,UA,UB,D',\n"
-                        + "  'bounded' = 'false')";
-        tEnv.executeSql(srcTableDdl);
-
-        String sinkTableDdl =
-                "CREATE TABLE MySink (\n"
-                        + "  a bigint,\n"
-                        + "  b int\n"
-                        + ") with (\n"
-                        + "  'connector' = 'values',\n"
-                        + "  'sink-insert-only' = 'false',\n"
-                        + "  'table-sink-class' = 'DEFAULT')";
-        tEnv.executeSql(sinkTableDdl);
-        util.verifyJsonPlan("insert into MySink select a, b from MyTable");
-    }
-
-    @Test
-    void testUpsertSource() {
-        String srcTableDdl =
-                "CREATE TABLE MyTable (\n"
-                        + "  a bigint,\n"
-                        + "  b int not null,\n"
-                        + "  c varchar,\n"
-                        + "  d timestamp(3),\n"
-                        + "  PRIMARY KEY (a, b) NOT ENFORCED\n"
-                        + ") with (\n"
-                        + "  'connector' = 'values',\n"
-                        + "  'changelog-mode' = 'I,UA,D',\n"
-                        + "  'bounded' = 'false')";
-        tEnv.executeSql(srcTableDdl);
-
-        String sinkTableDdl =
-                "CREATE TABLE MySink (\n"
-                        + "  a bigint,\n"
-                        + "  b int\n"
-                        + ") with (\n"
-                        + "  'connector' = 'values',\n"
-                        + "  'sink-insert-only' = 'false',\n"
-                        + "  'table-sink-class' = 'DEFAULT')";
-        tEnv.executeSql(sinkTableDdl);
-        util.verifyJsonPlan("insert into MySink select a, b from MyTable");
-    }
-}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ChangelogSourceJsonPlanITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ChangelogSourceJsonPlanITCase.java
deleted file mode 100644
index a6b78346f7c..00000000000
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ChangelogSourceJsonPlanITCase.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.runtime.stream.jsonplan;
-
-import org.apache.flink.table.planner.factories.TestValuesTableFactory;
-import org.apache.flink.table.planner.runtime.utils.TestData;
-import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
-import org.apache.flink.table.planner.utils.JsonPlanTestBase;
-
-import org.junit.jupiter.api.Test;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/** Integration tests for operations on changelog source, including upsert 
source. */
-class ChangelogSourceJsonPlanITCase extends JsonPlanTestBase {
-
-    @Test
-    void testChangelogSource() throws Exception {
-        registerChangelogSource();
-        createTestNonInsertOnlyValuesSinkTable(
-                "user_sink",
-                "user_id STRING PRIMARY KEY NOT ENFORCED",
-                "user_name STRING",
-                "email STRING",
-                "balance DECIMAL(18,2)",
-                "balance2 DECIMAL(18,2)");
-
-        String dml = "INSERT INTO user_sink SELECT * FROM users";
-        compileSqlAndExecutePlan(dml).await();
-
-        List<String> expected =
-                Arrays.asList(
-                        "+I[user1, Tom, tom...@gmail.com, 8.10, 16.20]",
-                        "+I[user3, Bailey, bai...@qq.com, 9.99, 19.98]",
-                        "+I[user4, Tina, t...@gmail.com, 11.30, 22.60]");
-        assertResult(expected, 
TestValuesTableFactory.getResultsAsStrings("user_sink"));
-    }
-
-    @Test
-    void testToUpsertSource() throws Exception {
-        registerUpsertSource();
-        createTestNonInsertOnlyValuesSinkTable(
-                "user_sink",
-                "user_id STRING PRIMARY KEY NOT ENFORCED",
-                "user_name STRING",
-                "email STRING",
-                "balance DECIMAL(18,2)",
-                "balance2 DECIMAL(18,2)");
-
-        String dml = "INSERT INTO user_sink SELECT * FROM users";
-        compileSqlAndExecutePlan(dml).await();
-
-        List<String> expected =
-                Arrays.asList(
-                        "+I[user1, Tom, tom...@gmail.com, 8.10, 16.20]",
-                        "+I[user3, Bailey, bai...@qq.com, 9.99, 19.98]",
-                        "+I[user4, Tina, t...@gmail.com, 11.30, 22.60]");
-        assertResult(expected, 
TestValuesTableFactory.getResultsAsStrings("user_sink"));
-    }
-
-    // 
------------------------------------------------------------------------------------------
-
-    protected void registerChangelogSource() {
-        Map<String, String> properties = new HashMap<>();
-        properties.put("changelog-mode", "I,UA,UB,D");
-        createTestValuesSourceTable(
-                "users",
-                JavaScalaConversionUtil.toJava(TestData.userChangelog()),
-                new String[] {
-                    "user_id STRING",
-                    "user_name STRING",
-                    "email STRING",
-                    "balance DECIMAL(18,2)",
-                    "balance2 AS balance * 2"
-                },
-                properties);
-    }
-
-    protected void registerUpsertSource() {
-        Map<String, String> properties = new HashMap<>();
-        properties.put("changelog-mode", "I,UA,D");
-        createTestValuesSourceTable(
-                "users",
-                JavaScalaConversionUtil.toJava(TestData.userUpsertlog()),
-                new String[] {
-                    "user_id STRING PRIMARY KEY NOT ENFORCED",
-                    "user_name STRING",
-                    "email STRING",
-                    "balance DECIMAL(18,2)",
-                    "balance2 AS balance * 2"
-                },
-                properties);
-    }
-}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogSourceJsonPlanTest_jsonplan/testChangelogSource.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogSourceJsonPlanTest_jsonplan/testChangelogSource.out
deleted file mode 100644
index 50e4add8f4a..00000000000
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogSourceJsonPlanTest_jsonplan/testChangelogSource.out
+++ /dev/null
@@ -1,174 +0,0 @@
-{
-  "flinkVersion" : "",
-  "nodes" : [ {
-    "id" : 1,
-    "type" : "stream-exec-table-source-scan_1",
-    "scanTableSource" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "a",
-              "dataType" : "BIGINT NOT NULL"
-            }, {
-              "name" : "b",
-              "dataType" : "INT NOT NULL"
-            }, {
-              "name" : "c",
-              "dataType" : "VARCHAR(2147483647)"
-            }, {
-              "name" : "d",
-              "dataType" : "TIMESTAMP(3)"
-            } ],
-            "watermarkSpecs" : [ ],
-            "primaryKey" : {
-              "name" : "PK_a_b",
-              "type" : "PRIMARY_KEY",
-              "columns" : [ "a", "b" ]
-            }
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "bounded" : "false",
-            "changelog-mode" : "I,UA,UB,D",
-            "connector" : "values"
-          }
-        }
-      },
-      "abilities" : [ {
-        "type" : "ProjectPushDown",
-        "projectedFields" : [ [ 0 ], [ 1 ] ],
-        "producedType" : "ROW<`a` BIGINT NOT NULL, `b` INT NOT NULL> NOT NULL"
-      }, {
-        "type" : "ReadingMetadata",
-        "metadataKeys" : [ ],
-        "producedType" : "ROW<`a` BIGINT NOT NULL, `b` INT NOT NULL> NOT NULL"
-      } ]
-    },
-    "outputType" : "ROW<`a` BIGINT NOT NULL, `b` INT NOT NULL>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable, project=[a, b], metadata=[]]], fields=[a, b])",
-    "inputProperties" : [ ]
-  }, {
-    "id" : 2,
-    "type" : "stream-exec-drop-update-before_1",
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a` BIGINT NOT NULL, `b` INT NOT NULL>",
-    "description" : "DropUpdateBefore"
-  }, {
-    "id" : 3,
-    "type" : "stream-exec-exchange_1",
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "HASH",
-        "keys" : [ 0, 1 ]
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a` BIGINT NOT NULL, `b` INT NOT NULL>",
-    "description" : "Exchange(distribution=[hash[a, b]])"
-  }, {
-    "id" : 4,
-    "type" : "stream-exec-changelog-normalize_1",
-    "configuration" : {
-      "table.exec.mini-batch.enabled" : "false",
-      "table.exec.mini-batch.size" : "-1"
-    },
-    "uniqueKeys" : [ 0, 1 ],
-    "generateUpdateBefore" : true,
-    "state" : [ {
-      "index" : 0,
-      "ttl" : "0 ms",
-      "name" : "changelogNormalizeState"
-    } ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a` BIGINT NOT NULL, `b` INT NOT NULL>",
-    "description" : "ChangelogNormalize(key=[a, b])"
-  }, {
-    "id" : 5,
-    "type" : "stream-exec-sink_1",
-    "configuration" : {
-      "table.exec.sink.keyed-shuffle" : "AUTO",
-      "table.exec.sink.not-null-enforcer" : "ERROR",
-      "table.exec.sink.rowtime-inserter" : "ENABLED",
-      "table.exec.sink.type-length-enforcer" : "IGNORE",
-      "table.exec.sink.upsert-materialize" : "AUTO"
-    },
-    "dynamicTableSink" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "a",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "b",
-              "dataType" : "INT"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "connector" : "values",
-            "sink-insert-only" : "false",
-            "table-sink-class" : "DEFAULT"
-          }
-        }
-      }
-    },
-    "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", 
"DELETE" ],
-    "inputUpsertKey" : [ 0, 1 ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a` BIGINT NOT NULL, `b` INT NOT NULL>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[a, b])"
-  } ],
-  "edges" : [ {
-    "source" : 1,
-    "target" : 2,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 2,
-    "target" : 3,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 3,
-    "target" : 4,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 4,
-    "target" : 5,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  } ]
-}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogSourceJsonPlanTest_jsonplan/testUpsertSource.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogSourceJsonPlanTest_jsonplan/testUpsertSource.out
deleted file mode 100644
index 1d7925cd82e..00000000000
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogSourceJsonPlanTest_jsonplan/testUpsertSource.out
+++ /dev/null
@@ -1,155 +0,0 @@
-{
-  "flinkVersion" : "",
-  "nodes" : [ {
-    "id" : 1,
-    "type" : "stream-exec-table-source-scan_1",
-    "scanTableSource" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "a",
-              "dataType" : "BIGINT NOT NULL"
-            }, {
-              "name" : "b",
-              "dataType" : "INT NOT NULL"
-            }, {
-              "name" : "c",
-              "dataType" : "VARCHAR(2147483647)"
-            }, {
-              "name" : "d",
-              "dataType" : "TIMESTAMP(3)"
-            } ],
-            "watermarkSpecs" : [ ],
-            "primaryKey" : {
-              "name" : "PK_a_b",
-              "type" : "PRIMARY_KEY",
-              "columns" : [ "a", "b" ]
-            }
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "bounded" : "false",
-            "changelog-mode" : "I,UA,D",
-            "connector" : "values"
-          }
-        }
-      },
-      "abilities" : [ {
-        "type" : "ProjectPushDown",
-        "projectedFields" : [ [ 0 ], [ 1 ] ],
-        "producedType" : "ROW<`a` BIGINT NOT NULL, `b` INT NOT NULL> NOT NULL"
-      }, {
-        "type" : "ReadingMetadata",
-        "metadataKeys" : [ ],
-        "producedType" : "ROW<`a` BIGINT NOT NULL, `b` INT NOT NULL> NOT NULL"
-      } ]
-    },
-    "outputType" : "ROW<`a` BIGINT NOT NULL, `b` INT NOT NULL>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable, project=[a, b], metadata=[]]], fields=[a, b])",
-    "inputProperties" : [ ]
-  }, {
-    "id" : 2,
-    "type" : "stream-exec-exchange_1",
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "HASH",
-        "keys" : [ 0, 1 ]
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a` BIGINT NOT NULL, `b` INT NOT NULL>",
-    "description" : "Exchange(distribution=[hash[a, b]])"
-  }, {
-    "id" : 3,
-    "type" : "stream-exec-changelog-normalize_1",
-    "configuration" : {
-      "table.exec.mini-batch.enabled" : "false",
-      "table.exec.mini-batch.size" : "-1"
-    },
-    "uniqueKeys" : [ 0, 1 ],
-    "generateUpdateBefore" : true,
-    "state" : [ {
-      "index" : 0,
-      "ttl" : "0 ms",
-      "name" : "changelogNormalizeState"
-    } ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a` BIGINT NOT NULL, `b` INT NOT NULL>",
-    "description" : "ChangelogNormalize(key=[a, b])"
-  }, {
-    "id" : 4,
-    "type" : "stream-exec-sink_1",
-    "configuration" : {
-      "table.exec.sink.keyed-shuffle" : "AUTO",
-      "table.exec.sink.not-null-enforcer" : "ERROR",
-      "table.exec.sink.rowtime-inserter" : "ENABLED",
-      "table.exec.sink.type-length-enforcer" : "IGNORE",
-      "table.exec.sink.upsert-materialize" : "AUTO"
-    },
-    "dynamicTableSink" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "a",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "b",
-              "dataType" : "INT"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "connector" : "values",
-            "sink-insert-only" : "false",
-            "table-sink-class" : "DEFAULT"
-          }
-        }
-      }
-    },
-    "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", 
"DELETE" ],
-    "inputUpsertKey" : [ 0, 1 ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a` BIGINT NOT NULL, `b` INT NOT NULL>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[a, b])"
-  } ],
-  "edges" : [ {
-    "source" : 1,
-    "target" : 2,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 2,
-    "target" : 3,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 3,
-    "target" : 4,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  } ]
-}

Reply via email to