This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f33a6f0  [FLINK-13534][hive] Unable to query Hive table with decimal 
column
f33a6f0 is described below

commit f33a6f0fd6d46d666d95707ac320115ada21cb0f
Author: Rui Li <li...@apache.org>
AuthorDate: Mon Aug 5 12:26:40 2019 +0800

    [FLINK-13534][hive] Unable to query Hive table with decimal column
    
    Fix the issue that Flink cannot access Hive table with decimal columns.
    
    This closes #9390.
---
 .../connectors/hive/HiveTableInputFormat.java      | 24 +++++------------
 .../connectors/hive/HiveTableOutputFormat.java     | 28 ++++++++++---------
 .../flink/connectors/hive/HiveTableSink.java       | 20 +++++---------
 .../flink/connectors/hive/HiveTableSource.java     |  8 +-----
 .../functions/hive/conversion/HiveInspectors.java  |  2 +-
 .../connectors/hive/TableEnvHiveConnectorTest.java | 31 +++++++++++++++++++++-
 6 files changed, 61 insertions(+), 52 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableInputFormat.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableInputFormat.java
index 8965320..8a38fb3 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableInputFormat.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableInputFormat.java
@@ -20,13 +20,9 @@ package org.apache.flink.connectors.hive;
 
 import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase;
 import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
 import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
@@ -64,8 +60,7 @@ import static 
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR;
  * The HiveTableInputFormat are inspired by the HCatInputFormat and 
HadoopInputFormatBase.
  * It's used to read from hive partition/non-partition table.
  */
-public class HiveTableInputFormat extends HadoopInputFormatCommonBase<Row, 
HiveTableInputSplit>
-               implements ResultTypeQueryable {
+public class HiveTableInputFormat extends HadoopInputFormatCommonBase<Row, 
HiveTableInputSplit> {
        private static final long serialVersionUID = 6351448428766433164L;
        private static Logger logger = 
LoggerFactory.getLogger(HiveTableInputFormat.class);
 
@@ -78,7 +73,8 @@ public class HiveTableInputFormat extends 
HadoopInputFormatCommonBase<Row, HiveT
        protected transient boolean fetched = false;
        protected transient boolean hasNext;
 
-       private RowTypeInfo rowTypeInfo;
+       // arity of each row, including partition columns
+       private int rowArity;
 
        //Necessary info to init deserializer
        private List<String> partitionColNames;
@@ -102,8 +98,7 @@ public class HiveTableInputFormat extends 
HadoopInputFormatCommonBase<Row, HiveT
 
                this.jobConf = new JobConf(jobConf);
                this.partitionColNames = catalogTable.getPartitionKeys();
-               TableSchema tableSchema = catalogTable.getSchema();
-               this.rowTypeInfo = new RowTypeInfo(tableSchema.getFieldTypes(), 
tableSchema.getFieldNames());
+               rowArity = catalogTable.getSchema().getFieldCount();
        }
 
        @Override
@@ -212,7 +207,7 @@ public class HiveTableInputFormat extends 
HadoopInputFormatCommonBase<Row, HiveT
                if (reachedEnd()) {
                        return null;
                }
-               Row row = new Row(rowTypeInfo.getArity());
+               Row row = new Row(rowArity);
                try {
                        //Use HiveDeserializer to deserialize an object out of 
a Writable blob
                        Object hiveRowStruct = deserializer.deserialize(value);
@@ -234,11 +229,6 @@ public class HiveTableInputFormat extends 
HadoopInputFormatCommonBase<Row, HiveT
                return row;
        }
 
-       @Override
-       public TypeInformation getProducedType() {
-               return rowTypeInfo;
-       }
-
        // 
--------------------------------------------------------------------------------------------
        //  Custom serialization methods
        // 
--------------------------------------------------------------------------------------------
@@ -246,7 +236,7 @@ public class HiveTableInputFormat extends 
HadoopInputFormatCommonBase<Row, HiveT
        private void writeObject(ObjectOutputStream out) throws IOException {
                super.write(out);
                jobConf.write(out);
-               out.writeObject(rowTypeInfo);
+               out.writeObject(rowArity);
                out.writeObject(partitionColNames);
                out.writeObject(partitions);
        }
@@ -263,7 +253,7 @@ public class HiveTableInputFormat extends 
HadoopInputFormatCommonBase<Row, HiveT
                if (currentUserCreds != null) {
                        jobConf.getCredentials().addAll(currentUserCreds);
                }
-               rowTypeInfo = (RowTypeInfo) in.readObject();
+               rowArity = (int) in.readObject();
                partitionColNames = (List<String>) in.readObject();
                partitions = (List<HiveTablePartition>) in.readObject();
        }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
index 9e1ee46..ec0b41d 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.io.InitializeOnMaster;
 import org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase;
 import org.apache.flink.api.java.hadoop.common.HadoopOutputFormatCommonBase;
 import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.CatalogTable;
@@ -38,7 +37,6 @@ import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
 import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
 import org.apache.flink.table.functions.hive.conversion.HiveObjectConversion;
 import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
@@ -114,7 +112,9 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase<Row> imp
        private transient JobConf jobConf;
        private transient ObjectPath tablePath;
        private transient List<String> partitionColumns;
-       private transient RowTypeInfo rowTypeInfo;
+       // Ideally we should maintain a TableSchema here, but it's not 
Serializable
+       private transient String[] fieldNames;
+       private transient DataType[] fieldTypes;
        private transient HiveTablePartition hiveTablePartition;
        private transient Properties tableProperties;
        private transient boolean overwrite;
@@ -159,7 +159,8 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase<Row> imp
                this.tablePath = tablePath;
                this.partitionColumns = table.getPartitionKeys();
                TableSchema tableSchema = table.getSchema();
-               this.rowTypeInfo = new RowTypeInfo(tableSchema.getFieldTypes(), 
tableSchema.getFieldNames());
+               this.fieldNames = tableSchema.getFieldNames();
+               this.fieldTypes = tableSchema.getFieldDataTypes();
                this.hiveTablePartition = hiveTablePartition;
                this.tableProperties = tableProperties;
                this.overwrite = overwrite;
@@ -177,7 +178,8 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase<Row> imp
                out.writeObject(isPartitioned);
                out.writeObject(isDynamicPartition);
                out.writeObject(overwrite);
-               out.writeObject(rowTypeInfo);
+               out.writeObject(fieldNames);
+               out.writeObject(fieldTypes);
                out.writeObject(hiveTablePartition);
                out.writeObject(partitionColumns);
                out.writeObject(tablePath);
@@ -200,7 +202,8 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase<Row> imp
                isPartitioned = (boolean) in.readObject();
                isDynamicPartition = (boolean) in.readObject();
                overwrite = (boolean) in.readObject();
-               rowTypeInfo = (RowTypeInfo) in.readObject();
+               fieldNames = (String[]) in.readObject();
+               fieldTypes = (DataType[]) in.readObject();
                hiveTablePartition = (HiveTablePartition) in.readObject();
                partitionColumns = (List<String>) in.readObject();
                tablePath = (ObjectPath) in.readObject();
@@ -286,26 +289,25 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase<Row> imp
                if (!isDynamicPartition) {
                        staticWriter = 
writerForLocation(hiveTablePartition.getStorageDescriptor().getLocation());
                } else {
-                       dynamicPartitionOffset = rowTypeInfo.getArity() - 
partitionColumns.size() + hiveTablePartition.getPartitionSpec().size();
+                       dynamicPartitionOffset = fieldNames.length - 
partitionColumns.size() + hiveTablePartition.getPartitionSpec().size();
                }
 
-               numNonPartitionColumns = isPartitioned ? rowTypeInfo.getArity() 
- partitionColumns.size() : rowTypeInfo.getArity();
+               numNonPartitionColumns = isPartitioned ? fieldNames.length - 
partitionColumns.size() : fieldNames.length;
                hiveConversions = new 
HiveObjectConversion[numNonPartitionColumns];
                List<ObjectInspector> objectInspectors = new 
ArrayList<>(hiveConversions.length);
                for (int i = 0; i < numNonPartitionColumns; i++) {
-                       DataType dataType = 
LegacyTypeInfoDataTypeConverter.toDataType(rowTypeInfo.getTypeAt(i));
-                       ObjectInspector objectInspector = 
HiveInspectors.getObjectInspector(dataType);
+                       ObjectInspector objectInspector = 
HiveInspectors.getObjectInspector(fieldTypes[i]);
                        objectInspectors.add(objectInspector);
-                       hiveConversions[i] = 
HiveInspectors.getConversion(objectInspector, dataType.getLogicalType());
+                       hiveConversions[i] = 
HiveInspectors.getConversion(objectInspector, fieldTypes[i].getLogicalType());
                }
 
                if (!isPartitioned) {
                        rowObjectInspector = 
ObjectInspectorFactory.getStandardStructObjectInspector(
-                               Arrays.asList(rowTypeInfo.getFieldNames()),
+                               Arrays.asList(fieldNames),
                                objectInspectors);
                } else {
                        rowObjectInspector = 
ObjectInspectorFactory.getStandardStructObjectInspector(
-                               
Arrays.asList(rowTypeInfo.getFieldNames()).subList(0, rowTypeInfo.getArity() - 
partitionColumns.size()),
+                               Arrays.asList(fieldNames).subList(0, 
fieldNames.length - partitionColumns.size()),
                                objectInspectors);
                        defaultPartitionName = 
jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
                                        
HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
index a36479d..2974de0 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
@@ -20,7 +20,6 @@ package org.apache.flink.connectors.hive;
 
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectPath;
@@ -32,6 +31,7 @@ import org.apache.flink.table.sinks.OutputFormatTableSink;
 import org.apache.flink.table.sinks.OverwritableTableSink;
 import org.apache.flink.table.sinks.PartitionableTableSink;
 import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
@@ -64,7 +64,7 @@ public class HiveTableSink extends OutputFormatTableSink<Row> 
implements Partiti
        private final JobConf jobConf;
        private final CatalogTable catalogTable;
        private final ObjectPath tablePath;
-       private final RowTypeInfo rowTypeInfo;
+       private final TableSchema tableSchema;
        private final String hiveVersion;
 
        private Map<String, String> staticPartitionSpec = 
Collections.emptyMap();
@@ -77,8 +77,7 @@ public class HiveTableSink extends OutputFormatTableSink<Row> 
implements Partiti
                this.catalogTable = table;
                hiveVersion = 
Preconditions.checkNotNull(jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION),
                                "Hive version is not defined");
-               TableSchema tableSchema = table.getSchema();
-               rowTypeInfo = new RowTypeInfo(tableSchema.getFieldTypes(), 
tableSchema.getFieldNames());
+               tableSchema = table.getSchema();
        }
 
        @Override
@@ -136,18 +135,13 @@ public class HiveTableSink extends 
OutputFormatTableSink<Row> implements Partiti
        }
 
        @Override
-       public String[] getFieldNames() {
-               return rowTypeInfo.getFieldNames();
+       public DataType getConsumedDataType() {
+               return getTableSchema().toRowDataType();
        }
 
        @Override
-       public TypeInformation<?>[] getFieldTypes() {
-               return rowTypeInfo.getFieldTypes();
-       }
-
-       @Override
-       public TypeInformation<Row> getOutputType() {
-               return rowTypeInfo;
+       public TableSchema getTableSchema() {
+               return tableSchema;
        }
 
        // get a staging dir associated with a final dir
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
index 0563029..451bd93 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
@@ -21,7 +21,6 @@ package org.apache.flink.connectors.hive;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectPath;
@@ -107,12 +106,7 @@ public class HiveTableSource extends 
InputFormatTableSource<Row> implements Part
 
        @Override
        public DataType getProducedDataType() {
-               TableSchema tableSchema = catalogTable.getSchema();
-               DataTypes.Field[] fields = new 
DataTypes.Field[tableSchema.getFieldCount()];
-               for (int i = 0; i < fields.length; i++) {
-                       fields[i] = 
DataTypes.FIELD(tableSchema.getFieldName(i).get(), 
tableSchema.getFieldDataType(i).get());
-               }
-               return DataTypes.ROW(fields);
+               return getTableSchema().toRowDataType();
        }
 
        @Override
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
index 6d82d38..3546668 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
@@ -203,7 +203,7 @@ public class HiveInspectors {
                        } else if (inspector instanceof 
HiveVarcharObjectInspector) {
                                return o -> new HiveVarchar((String) o, 
((VarCharType) dataType).getLength());
                        } else if (inspector instanceof 
HiveDecimalObjectInspector) {
-                               return o -> HiveDecimal.create((BigDecimal) o);
+                               return o -> o == null ? null : 
HiveDecimal.create((BigDecimal) o);
                        }
                }
 
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
index e8c402a..0845eae 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
@@ -138,6 +138,33 @@ public class TableEnvHiveConnectorTest {
                hiveShell.execute("drop database db1 cascade");
        }
 
+       @Test
+       public void testDecimal() throws Exception {
+               hiveShell.execute("create database db1");
+               try {
+                       hiveShell.execute("create table db1.src1 (x 
decimal(10,2))");
+                       hiveShell.execute("create table db1.src2 (x 
decimal(10,2))");
+                       hiveShell.execute("create table db1.dest (x 
decimal(10,2))");
+                       // populate src1 from Hive
+                       hiveShell.execute("insert into db1.src1 values 
(1.0),(2.12),(5.123),(5.456),(123456789.12)");
+
+                       TableEnvironment tableEnv = 
getTableEnvWithHiveCatalog();
+                       // populate src2 with same data from Flink
+                       tableEnv.sqlUpdate("insert into db1.src2 values 
(cast(1.0 as decimal(10,2))), (cast(2.12 as decimal(10,2))), " +
+                                       "(cast(5.123 as decimal(10,2))), 
(cast(5.456 as decimal(10,2))), (cast(123456789.12 as decimal(10,2)))");
+                       tableEnv.execute("test1");
+                       // verify src1 and src2 contain same data
+                       verifyHiveQueryResult("select * from db1.src2", 
hiveShell.executeQuery("select * from db1.src1"));
+
+                       // populate dest with src1 from Flink -- to test 
reading decimal type from Hive
+                       tableEnv.sqlUpdate("insert into db1.dest select * from 
db1.src1");
+                       tableEnv.execute("test2");
+                       verifyHiveQueryResult("select * from db1.dest", 
hiveShell.executeQuery("select * from db1.src1"));
+               } finally {
+                       hiveShell.execute("drop database db1 cascade");
+               }
+       }
+
        private TableEnvironment getTableEnvWithHiveCatalog() {
                TableEnvironment tableEnv = HiveTestUtils.createTableEnv();
                tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
@@ -146,6 +173,8 @@ public class TableEnvHiveConnectorTest {
        }
 
        private void verifyHiveQueryResult(String query, List<String> expected) 
{
-               assertEquals(new HashSet<>(expected), new 
HashSet<>(hiveShell.executeQuery(query)));
+               List<String> results = hiveShell.executeQuery(query);
+               assertEquals(expected.size(), results.size());
+               assertEquals(new HashSet<>(expected), new HashSet<>(results));
        }
 }

Reply via email to