This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new d32af52 [FLINK-13952][table-planner][hive] PartitionableTableSink can not work with OverwritableTableSink d32af52 is described below commit d32af521cbe83f88cd0b822c4d752a1b5102c47c Author: Rui Li <li...@apache.org> AuthorDate: Wed Sep 4 21:27:00 2019 +0800 [FLINK-13952][table-planner][hive] PartitionableTableSink can not work with OverwritableTableSink To support insert overwrite partition. This closes #9615. --- .../flink/connectors/hive/TableEnvHiveConnectorTest.java | 16 ++++++++++++++++ .../flink/table/planner/delegation/PlannerBase.scala | 3 +++ .../apache/flink/table/api/internal/TableEnvImpl.scala | 5 ++++- .../org/apache/flink/table/planner/StreamPlanner.scala | 5 ++++- 4 files changed, 27 insertions(+), 2 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java index 07dd674..e39999a 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java @@ -177,6 +177,7 @@ public class TableEnvHiveConnectorTest { public void testInsertOverwrite() throws Exception { hiveShell.execute("create database db1"); try { + // non-partitioned hiveShell.execute("create table db1.dest (x int, y string)"); hiveShell.insertInto("db1", "dest").addRow(1, "a").addRow(2, "b").commit(); verifyHiveQueryResult("select * from db1.dest", Arrays.asList("1\ta", "2\tb")); @@ -184,6 +185,21 @@ public class TableEnvHiveConnectorTest { tableEnv.sqlUpdate("insert overwrite db1.dest values (3,'c')"); tableEnv.execute("test insert overwrite"); verifyHiveQueryResult("select * from db1.dest", Collections.singletonList("3\tc")); + + // static partition + hiveShell.execute("create table db1.part(x int) partitioned by (y int)"); + hiveShell.insertInto("db1", "part").addRow(1, 1).addRow(2, 2).commit(); + tableEnv = getTableEnvWithHiveCatalog(); + tableEnv.sqlUpdate("insert overwrite db1.part partition (y=1) select 100"); + tableEnv.execute("insert overwrite static partition"); + verifyHiveQueryResult("select * from db1.part", Arrays.asList("100\t1", "2\t2")); + + // dynamic partition + tableEnv = getTableEnvWithHiveCatalog(); + tableEnv.sqlUpdate("insert overwrite db1.part values (200,2),(3,3)"); + tableEnv.execute("insert overwrite dynamic partition"); + // only overwrite dynamically matched partitions, other existing partitions remain intact + verifyHiveQueryResult("select * from db1.part", Arrays.asList("100\t1", "200\t2", "3\t3")); } finally { hiveShell.execute("drop database db1 cascade"); } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala index f0e18f0..90cdab9 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala @@ -182,6 +182,9 @@ abstract class PlannerBase( if partitionableSink.getPartitionFieldNames != null && partitionableSink.getPartitionFieldNames.nonEmpty => partitionableSink.setStaticPartition(catalogSink.getStaticPartitions) + case _ => + } + sink match { case overwritableTableSink: OverwritableTableSink => overwritableTableSink.setOverwrite(catalogSink.isOverwrite) case _ => diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala index 0dece49..0e00268 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala @@ -474,12 +474,15 @@ abstract class TableEnvImpl( objectIdentifier, tableSink) // set static partitions if it is a partitioned table sink - // set whether to overwrite if it's an OverwritableTableSink tableSink match { case partitionableSink: PartitionableTableSink if partitionableSink.getPartitionFieldNames != null && partitionableSink.getPartitionFieldNames.nonEmpty => partitionableSink.setStaticPartition(insertOptions.staticPartitions) + case _ => + } + // set whether to overwrite if it's an OverwritableTableSink + tableSink match { case overwritableTableSink: OverwritableTableSink => overwritableTableSink.setOverwrite(insertOptions.overwrite) case _ => diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala index 140198b..10a04de 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala @@ -160,12 +160,15 @@ class StreamPlanner( identifier, sink) // set static partitions if it is a partitioned sink - // set whether to overwrite if it's an OverwritableTableSink sink match { case partitionableSink: PartitionableTableSink if partitionableSink.getPartitionFieldNames != null && partitionableSink.getPartitionFieldNames.nonEmpty => partitionableSink.setStaticPartition(catalogSink.getStaticPartitions) + case _ => + } + // set whether to overwrite if it's an OverwritableTableSink + sink match { case overwritableTableSink: OverwritableTableSink => overwritableTableSink.setOverwrite(catalogSink.isOverwrite) case _ =>