This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 07dfffe  [FLINK-13814][hive] HiveTableSink should strip quotes from 
partition values
07dfffe is described below

commit 07dfffec8248788630bff5e99afe9866d8b50487
Author: Rui Li <li...@apache.org>
AuthorDate: Thu Aug 22 14:53:26 2019 +0800

    [FLINK-13814][hive] HiveTableSink should strip quotes from partition values
    
    Strip quotes from partition value in order to get proper string values.
    
    This closes #9502.
---
 .../flink/connectors/hive/HiveTableSinkTest.java   | 57 ----------------------
 .../connectors/hive/TableEnvHiveConnectorTest.java | 52 ++++++++++++++++++++
 .../apache/flink/sql/parser/dml/RichSqlInsert.java |  7 ++-
 3 files changed, 57 insertions(+), 59 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkTest.java
index 13bddc0..51a56fb 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkTest.java
@@ -27,7 +27,6 @@ import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.ObjectPath;
@@ -41,7 +40,6 @@ import org.apache.flink.types.Row;
 import com.klarna.hiverunner.HiveShell;
 import com.klarna.hiverunner.annotations.HiveSQL;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.mapred.JobConf;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -107,31 +105,6 @@ public class HiveTableSinkTest {
        }
 
        @Test
-       public void testInsertIntoDynamicPartition() throws Exception {
-               String dbName = "default";
-               String tblName = "dest";
-               RowTypeInfo rowTypeInfo = createDestTable(dbName, tblName, 1);
-               ObjectPath tablePath = new ObjectPath(dbName, tblName);
-
-               TableEnvironment tableEnv = HiveTestUtils.createTableEnv();
-
-               List<Row> toWrite = generateRecords(5);
-               Table src = tableEnv.fromTableSource(new 
CollectionTableSource(toWrite, rowTypeInfo));
-               tableEnv.registerTable("src", src);
-
-               tableEnv.registerCatalog("hive", hiveCatalog);
-               tableEnv.sqlQuery("select * from src").insertInto("hive", 
"default", "dest");
-               tableEnv.execute("mytest");
-
-               List<CatalogPartitionSpec> partitionSpecs = 
hiveCatalog.listPartitions(tablePath);
-               assertEquals(toWrite.size(), partitionSpecs.size());
-
-               verifyWrittenData(toWrite, hiveShell.executeQuery("select * 
from " + tblName));
-
-               hiveCatalog.dropTable(tablePath, false);
-       }
-
-       @Test
        public void testWriteComplexType() throws Exception {
                String dbName = "default";
                String tblName = "dest";
@@ -213,36 +186,6 @@ public class HiveTableSinkTest {
                hiveCatalog.dropTable(tablePath, false);
        }
 
-       @Test
-       public void testInsertIntoStaticPartition() throws Exception {
-               String dbName = "default";
-               String tblName = "dest";
-               RowTypeInfo rowTypeInfo = createDestTable(dbName, tblName, 1);
-               ObjectPath tablePath = new ObjectPath(dbName, tblName);
-
-               TableEnvironment tableEnv = HiveTestUtils.createTableEnv();
-               List<Row> toWrite = generateRecords(1);
-               Table src = tableEnv.fromTableSource(new 
CollectionTableSource(toWrite, rowTypeInfo));
-               tableEnv.registerTable("src", src);
-
-               Map<String, String> partSpec = new HashMap<>();
-               partSpec.put("s", "a");
-
-               CatalogTable table = (CatalogTable) 
hiveCatalog.getTable(tablePath);
-               HiveTableSink hiveTableSink = new HiveTableSink(new 
JobConf(hiveConf), tablePath, table);
-               hiveTableSink.setStaticPartition(partSpec);
-               tableEnv.registerTableSink("destSink", hiveTableSink);
-               tableEnv.sqlQuery("select * from src").insertInto("destSink");
-               tableEnv.execute("mytest");
-
-               // make sure new partition is created
-               assertEquals(toWrite.size(), 
hiveCatalog.listPartitions(tablePath).size());
-
-               verifyWrittenData(toWrite, hiveShell.executeQuery("select * 
from " + tblName));
-
-               hiveCatalog.dropTable(tablePath, false);
-       }
-
        private RowTypeInfo createDestTable(String dbName, String tblName, 
TableSchema tableSchema, int numPartCols) throws Exception {
                CatalogTable catalogTable = createCatalogTable(tableSchema, 
numPartCols);
                hiveCatalog.createTable(new ObjectPath(dbName, tblName), 
catalogTable, false);
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
index ebf9901..07dd674 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.connectors.hive;
 
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.catalog.hive.HiveTestUtils;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
@@ -188,6 +189,57 @@ public class TableEnvHiveConnectorTest {
                }
        }
 
+       @Test
+       public void testStaticPartition() throws Exception {
+               hiveShell.execute("create database db1");
+               try {
+                       hiveShell.execute("create table db1.src (x int)");
+                       hiveShell.insertInto("db1", 
"src").addRow(1).addRow(2).commit();
+                       hiveShell.execute("create table db1.dest (x int) 
partitioned by (p1 string, p2 double)");
+                       TableEnvironment tableEnv = 
getTableEnvWithHiveCatalog();
+                       tableEnv.sqlUpdate("insert into db1.dest partition 
(p1='1''1', p2=1.1) select x from db1.src");
+                       tableEnv.execute("static partitioning");
+                       assertEquals(1, hiveCatalog.listPartitions(new 
ObjectPath("db1", "dest")).size());
+                       verifyHiveQueryResult("select * from db1.dest", 
Arrays.asList("1\t1'1\t1.1", "2\t1'1\t1.1"));
+               } finally {
+                       hiveShell.execute("drop database db1 cascade");
+               }
+       }
+
+       @Test
+       public void testDynamicPartition() throws Exception {
+               hiveShell.execute("create database db1");
+               try {
+                       hiveShell.execute("create table db1.src (x int, y 
string, z double)");
+                       hiveShell.insertInto("db1", "src").addRow(1, "a", 
1.1).addRow(2, "a", 2.2).addRow(3, "b", 3.3).commit();
+                       hiveShell.execute("create table db1.dest (x int) 
partitioned by (p1 string, p2 double)");
+                       TableEnvironment tableEnv = 
getTableEnvWithHiveCatalog();
+                       tableEnv.sqlUpdate("insert into db1.dest select * from 
db1.src");
+                       tableEnv.execute("dynamic partitioning");
+                       assertEquals(3, hiveCatalog.listPartitions(new 
ObjectPath("db1", "dest")).size());
+                       verifyHiveQueryResult("select * from db1.dest", 
Arrays.asList("1\ta\t1.1", "2\ta\t2.2", "3\tb\t3.3"));
+               } finally {
+                       hiveShell.execute("drop database db1 cascade");
+               }
+       }
+
+       @Test
+       public void testPartialDynamicPartition() throws Exception {
+               hiveShell.execute("create database db1");
+               try {
+                       hiveShell.execute("create table db1.src (x int, y 
string)");
+                       hiveShell.insertInto("db1", "src").addRow(1, 
"a").addRow(2, "b").commit();
+                       hiveShell.execute("create table db1.dest (x int) 
partitioned by (p1 double, p2 string)");
+                       TableEnvironment tableEnv = 
getTableEnvWithHiveCatalog();
+                       tableEnv.sqlUpdate("insert into db1.dest partition 
(p1=1.1) select x,y from db1.src");
+                       tableEnv.execute("partial dynamic partitioning");
+                       assertEquals(2, hiveCatalog.listPartitions(new 
ObjectPath("db1", "dest")).size());
+                       verifyHiveQueryResult("select * from db1.dest", 
Arrays.asList("1\t1.1\ta", "2\t1.1\tb"));
+               } finally {
+                       hiveShell.execute("drop database db1 cascade");
+               }
+       }
+
        private TableEnvironment getTableEnvWithHiveCatalog() {
                TableEnvironment tableEnv = HiveTestUtils.createTableEnv();
                tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dml/RichSqlInsert.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dml/RichSqlInsert.java
index d9df258..4b681b2 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dml/RichSqlInsert.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dml/RichSqlInsert.java
@@ -29,6 +29,7 @@ import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.NlsString;
 
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -61,7 +62,8 @@ public class RichSqlInsert extends SqlInsert implements 
ExtendedSqlNode {
 
        /** Get static partition key value pair as strings.
         *
-        * <p>Caution that we use {@link SqlLiteral#toString()} to get
+        * <p>For character literals we return the unquoted and unescaped 
values.
+        * For other types we use {@link SqlLiteral#toString()} to get
         * the string format of the value literal. If the string format is not
         * what you need, use {@link #getStaticPartitions()}.
         *
@@ -75,7 +77,8 @@ public class RichSqlInsert extends SqlInsert implements 
ExtendedSqlNode {
                }
                for (SqlNode node : this.staticPartitions.getList()) {
                        SqlProperty sqlProperty = (SqlProperty) node;
-                       String value = 
SqlLiteral.value(sqlProperty.getValue()).toString();
+                       Comparable comparable = 
SqlLiteral.value(sqlProperty.getValue());
+                       String value = comparable instanceof NlsString ? 
((NlsString) comparable).getValue() : comparable.toString();
                        ret.put(sqlProperty.getKey().getSimple(), value);
                }
                return ret;

Reply via email to