This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 07dfffe [FLINK-13814][hive] HiveTableSink should strip quotes from partition values 07dfffe is described below commit 07dfffec8248788630bff5e99afe9866d8b50487 Author: Rui Li <li...@apache.org> AuthorDate: Thu Aug 22 14:53:26 2019 +0800 [FLINK-13814][hive] HiveTableSink should strip quotes from partition values Strip quotes from partition value in order to get proper string values. This closes #9502. --- .../flink/connectors/hive/HiveTableSinkTest.java | 57 ---------------------- .../connectors/hive/TableEnvHiveConnectorTest.java | 52 ++++++++++++++++++++ .../apache/flink/sql/parser/dml/RichSqlInsert.java | 7 ++- 3 files changed, 57 insertions(+), 59 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkTest.java index 13bddc0..51a56fb 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkTest.java @@ -27,7 +27,6 @@ import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.catalog.CatalogPartitionSpec; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.CatalogTableImpl; import org.apache.flink.table.catalog.ObjectPath; @@ -41,7 +40,6 @@ import org.apache.flink.types.Row; import com.klarna.hiverunner.HiveShell; import com.klarna.hiverunner.annotations.HiveSQL; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.mapred.JobConf; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -107,31 +105,6 @@ public class HiveTableSinkTest { } @Test - public void testInsertIntoDynamicPartition() throws Exception { - String dbName = "default"; - String tblName = "dest"; - RowTypeInfo rowTypeInfo = createDestTable(dbName, tblName, 1); - ObjectPath tablePath = new ObjectPath(dbName, tblName); - - TableEnvironment tableEnv = HiveTestUtils.createTableEnv(); - - List<Row> toWrite = generateRecords(5); - Table src = tableEnv.fromTableSource(new CollectionTableSource(toWrite, rowTypeInfo)); - tableEnv.registerTable("src", src); - - tableEnv.registerCatalog("hive", hiveCatalog); - tableEnv.sqlQuery("select * from src").insertInto("hive", "default", "dest"); - tableEnv.execute("mytest"); - - List<CatalogPartitionSpec> partitionSpecs = hiveCatalog.listPartitions(tablePath); - assertEquals(toWrite.size(), partitionSpecs.size()); - - verifyWrittenData(toWrite, hiveShell.executeQuery("select * from " + tblName)); - - hiveCatalog.dropTable(tablePath, false); - } - - @Test public void testWriteComplexType() throws Exception { String dbName = "default"; String tblName = "dest"; @@ -213,36 +186,6 @@ public class HiveTableSinkTest { hiveCatalog.dropTable(tablePath, false); } - @Test - public void testInsertIntoStaticPartition() throws Exception { - String dbName = "default"; - String tblName = "dest"; - RowTypeInfo rowTypeInfo = createDestTable(dbName, tblName, 1); - ObjectPath tablePath = new ObjectPath(dbName, tblName); - - TableEnvironment tableEnv = HiveTestUtils.createTableEnv(); - List<Row> toWrite = generateRecords(1); - Table src = tableEnv.fromTableSource(new CollectionTableSource(toWrite, rowTypeInfo)); - tableEnv.registerTable("src", src); - - Map<String, String> partSpec = new HashMap<>(); - partSpec.put("s", "a"); - - CatalogTable table = (CatalogTable) hiveCatalog.getTable(tablePath); - HiveTableSink hiveTableSink = new HiveTableSink(new JobConf(hiveConf), tablePath, table); - hiveTableSink.setStaticPartition(partSpec); - tableEnv.registerTableSink("destSink", hiveTableSink); - tableEnv.sqlQuery("select * from src").insertInto("destSink"); - tableEnv.execute("mytest"); - - // make sure new partition is created - assertEquals(toWrite.size(), hiveCatalog.listPartitions(tablePath).size()); - - verifyWrittenData(toWrite, hiveShell.executeQuery("select * from " + tblName)); - - hiveCatalog.dropTable(tablePath, false); - } - private RowTypeInfo createDestTable(String dbName, String tblName, TableSchema tableSchema, int numPartCols) throws Exception { CatalogTable catalogTable = createCatalogTable(tableSchema, numPartCols); hiveCatalog.createTable(new ObjectPath(dbName, tblName), catalogTable, false); diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java index ebf9901..07dd674 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java @@ -20,6 +20,7 @@ package org.apache.flink.connectors.hive; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.internal.TableImpl; +import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.table.catalog.hive.HiveTestUtils; import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory; @@ -188,6 +189,57 @@ public class TableEnvHiveConnectorTest { } } + @Test + public void testStaticPartition() throws Exception { + hiveShell.execute("create database db1"); + try { + hiveShell.execute("create table db1.src (x int)"); + hiveShell.insertInto("db1", "src").addRow(1).addRow(2).commit(); + hiveShell.execute("create table db1.dest (x int) partitioned by (p1 string, p2 double)"); + TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); + tableEnv.sqlUpdate("insert into db1.dest partition (p1='1''1', p2=1.1) select x from db1.src"); + tableEnv.execute("static partitioning"); + assertEquals(1, hiveCatalog.listPartitions(new ObjectPath("db1", "dest")).size()); + verifyHiveQueryResult("select * from db1.dest", Arrays.asList("1\t1'1\t1.1", "2\t1'1\t1.1")); + } finally { + hiveShell.execute("drop database db1 cascade"); + } + } + + @Test + public void testDynamicPartition() throws Exception { + hiveShell.execute("create database db1"); + try { + hiveShell.execute("create table db1.src (x int, y string, z double)"); + hiveShell.insertInto("db1", "src").addRow(1, "a", 1.1).addRow(2, "a", 2.2).addRow(3, "b", 3.3).commit(); + hiveShell.execute("create table db1.dest (x int) partitioned by (p1 string, p2 double)"); + TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); + tableEnv.sqlUpdate("insert into db1.dest select * from db1.src"); + tableEnv.execute("dynamic partitioning"); + assertEquals(3, hiveCatalog.listPartitions(new ObjectPath("db1", "dest")).size()); + verifyHiveQueryResult("select * from db1.dest", Arrays.asList("1\ta\t1.1", "2\ta\t2.2", "3\tb\t3.3")); + } finally { + hiveShell.execute("drop database db1 cascade"); + } + } + + @Test + public void testPartialDynamicPartition() throws Exception { + hiveShell.execute("create database db1"); + try { + hiveShell.execute("create table db1.src (x int, y string)"); + hiveShell.insertInto("db1", "src").addRow(1, "a").addRow(2, "b").commit(); + hiveShell.execute("create table db1.dest (x int) partitioned by (p1 double, p2 string)"); + TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); + tableEnv.sqlUpdate("insert into db1.dest partition (p1=1.1) select x,y from db1.src"); + tableEnv.execute("partial dynamic partitioning"); + assertEquals(2, hiveCatalog.listPartitions(new ObjectPath("db1", "dest")).size()); + verifyHiveQueryResult("select * from db1.dest", Arrays.asList("1\t1.1\ta", "2\t1.1\tb")); + } finally { + hiveShell.execute("drop database db1 cascade"); + } + } + private TableEnvironment getTableEnvWithHiveCatalog() { TableEnvironment tableEnv = HiveTestUtils.createTableEnv(); tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog); diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dml/RichSqlInsert.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dml/RichSqlInsert.java index d9df258..4b681b2 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dml/RichSqlInsert.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dml/RichSqlInsert.java @@ -29,6 +29,7 @@ import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.sql.SqlWriter; import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.util.NlsString; import java.util.LinkedHashMap; import java.util.List; @@ -61,7 +62,8 @@ public class RichSqlInsert extends SqlInsert implements ExtendedSqlNode { /** Get static partition key value pair as strings. * - * <p>Caution that we use {@link SqlLiteral#toString()} to get + * <p>For character literals we return the unquoted and unescaped values. + * For other types we use {@link SqlLiteral#toString()} to get * the string format of the value literal. If the string format is not * what you need, use {@link #getStaticPartitions()}. * @@ -75,7 +77,8 @@ public class RichSqlInsert extends SqlInsert implements ExtendedSqlNode { } for (SqlNode node : this.staticPartitions.getList()) { SqlProperty sqlProperty = (SqlProperty) node; - String value = SqlLiteral.value(sqlProperty.getValue()).toString(); + Comparable comparable = SqlLiteral.value(sqlProperty.getValue()); + String value = comparable instanceof NlsString ? ((NlsString) comparable).getValue() : comparable.toString(); ret.put(sqlProperty.getKey().getSimple(), value); } return ret;