This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new f33a6f0 [FLINK-13534][hive] Unable to query Hive table with decimal column f33a6f0 is described below commit f33a6f0fd6d46d666d95707ac320115ada21cb0f Author: Rui Li <li...@apache.org> AuthorDate: Mon Aug 5 12:26:40 2019 +0800 [FLINK-13534][hive] Unable to query Hive table with decimal column Fix the issue that Flink cannot access Hive table with decimal columns. This closes #9390. --- .../connectors/hive/HiveTableInputFormat.java | 24 +++++------------ .../connectors/hive/HiveTableOutputFormat.java | 28 ++++++++++--------- .../flink/connectors/hive/HiveTableSink.java | 20 +++++--------- .../flink/connectors/hive/HiveTableSource.java | 8 +----- .../functions/hive/conversion/HiveInspectors.java | 2 +- .../connectors/hive/TableEnvHiveConnectorTest.java | 31 +++++++++++++++++++++- 6 files changed, 61 insertions(+), 52 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableInputFormat.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableInputFormat.java index 8965320..8a38fb3 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableInputFormat.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableInputFormat.java @@ -20,13 +20,9 @@ package org.apache.flink.connectors.hive; import org.apache.flink.api.common.io.LocatableInputSplitAssigner; import org.apache.flink.api.common.io.statistics.BaseStatistics; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase; import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter; -import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.core.io.InputSplitAssigner; -import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.hive.util.HiveTableUtil; import org.apache.flink.table.functions.hive.conversion.HiveInspectors; @@ -64,8 +60,7 @@ import static org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR; * The HiveTableInputFormat are inspired by the HCatInputFormat and HadoopInputFormatBase. * It's used to read from hive partition/non-partition table. */ -public class HiveTableInputFormat extends HadoopInputFormatCommonBase<Row, HiveTableInputSplit> - implements ResultTypeQueryable { +public class HiveTableInputFormat extends HadoopInputFormatCommonBase<Row, HiveTableInputSplit> { private static final long serialVersionUID = 6351448428766433164L; private static Logger logger = LoggerFactory.getLogger(HiveTableInputFormat.class); @@ -78,7 +73,8 @@ public class HiveTableInputFormat extends HadoopInputFormatCommonBase<Row, HiveT protected transient boolean fetched = false; protected transient boolean hasNext; - private RowTypeInfo rowTypeInfo; + // arity of each row, including partition columns + private int rowArity; //Necessary info to init deserializer private List<String> partitionColNames; @@ -102,8 +98,7 @@ public class HiveTableInputFormat extends HadoopInputFormatCommonBase<Row, HiveT this.jobConf = new JobConf(jobConf); this.partitionColNames = catalogTable.getPartitionKeys(); - TableSchema tableSchema = catalogTable.getSchema(); - this.rowTypeInfo = new RowTypeInfo(tableSchema.getFieldTypes(), tableSchema.getFieldNames()); + rowArity = catalogTable.getSchema().getFieldCount(); } @Override @@ -212,7 +207,7 @@ public class HiveTableInputFormat extends HadoopInputFormatCommonBase<Row, HiveT if (reachedEnd()) { return null; } - Row row = new Row(rowTypeInfo.getArity()); + Row row = new Row(rowArity); try { //Use HiveDeserializer to deserialize an object out of a Writable blob Object hiveRowStruct = deserializer.deserialize(value); @@ -234,11 +229,6 @@ public class HiveTableInputFormat extends HadoopInputFormatCommonBase<Row, HiveT return row; } - @Override - public TypeInformation getProducedType() { - return rowTypeInfo; - } - // -------------------------------------------------------------------------------------------- // Custom serialization methods // -------------------------------------------------------------------------------------------- @@ -246,7 +236,7 @@ public class HiveTableInputFormat extends HadoopInputFormatCommonBase<Row, HiveT private void writeObject(ObjectOutputStream out) throws IOException { super.write(out); jobConf.write(out); - out.writeObject(rowTypeInfo); + out.writeObject(rowArity); out.writeObject(partitionColNames); out.writeObject(partitions); } @@ -263,7 +253,7 @@ public class HiveTableInputFormat extends HadoopInputFormatCommonBase<Row, HiveT if (currentUserCreds != null) { jobConf.getCredentials().addAll(currentUserCreds); } - rowTypeInfo = (RowTypeInfo) in.readObject(); + rowArity = (int) in.readObject(); partitionColNames = (List<String>) in.readObject(); partitions = (List<HiveTablePartition>) in.readObject(); } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java index 9e1ee46..ec0b41d 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.io.InitializeOnMaster; import org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase; import org.apache.flink.api.java.hadoop.common.HadoopOutputFormatCommonBase; import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils; -import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.CatalogTable; @@ -38,7 +37,6 @@ import org.apache.flink.table.catalog.hive.util.HiveTableUtil; import org.apache.flink.table.functions.hive.conversion.HiveInspectors; import org.apache.flink.table.functions.hive.conversion.HiveObjectConversion; import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter; import org.apache.flink.types.Row; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; @@ -114,7 +112,9 @@ public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase<Row> imp private transient JobConf jobConf; private transient ObjectPath tablePath; private transient List<String> partitionColumns; - private transient RowTypeInfo rowTypeInfo; + // Ideally we should maintain a TableSchema here, but it's not Serializable + private transient String[] fieldNames; + private transient DataType[] fieldTypes; private transient HiveTablePartition hiveTablePartition; private transient Properties tableProperties; private transient boolean overwrite; @@ -159,7 +159,8 @@ public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase<Row> imp this.tablePath = tablePath; this.partitionColumns = table.getPartitionKeys(); TableSchema tableSchema = table.getSchema(); - this.rowTypeInfo = new RowTypeInfo(tableSchema.getFieldTypes(), tableSchema.getFieldNames()); + this.fieldNames = tableSchema.getFieldNames(); + this.fieldTypes = tableSchema.getFieldDataTypes(); this.hiveTablePartition = hiveTablePartition; this.tableProperties = tableProperties; this.overwrite = overwrite; @@ -177,7 +178,8 @@ public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase<Row> imp out.writeObject(isPartitioned); out.writeObject(isDynamicPartition); out.writeObject(overwrite); - out.writeObject(rowTypeInfo); + out.writeObject(fieldNames); + out.writeObject(fieldTypes); out.writeObject(hiveTablePartition); out.writeObject(partitionColumns); out.writeObject(tablePath); @@ -200,7 +202,8 @@ public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase<Row> imp isPartitioned = (boolean) in.readObject(); isDynamicPartition = (boolean) in.readObject(); overwrite = (boolean) in.readObject(); - rowTypeInfo = (RowTypeInfo) in.readObject(); + fieldNames = (String[]) in.readObject(); + fieldTypes = (DataType[]) in.readObject(); hiveTablePartition = (HiveTablePartition) in.readObject(); partitionColumns = (List<String>) in.readObject(); tablePath = (ObjectPath) in.readObject(); @@ -286,26 +289,25 @@ public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase<Row> imp if (!isDynamicPartition) { staticWriter = writerForLocation(hiveTablePartition.getStorageDescriptor().getLocation()); } else { - dynamicPartitionOffset = rowTypeInfo.getArity() - partitionColumns.size() + hiveTablePartition.getPartitionSpec().size(); + dynamicPartitionOffset = fieldNames.length - partitionColumns.size() + hiveTablePartition.getPartitionSpec().size(); } - numNonPartitionColumns = isPartitioned ? rowTypeInfo.getArity() - partitionColumns.size() : rowTypeInfo.getArity(); + numNonPartitionColumns = isPartitioned ? fieldNames.length - partitionColumns.size() : fieldNames.length; hiveConversions = new HiveObjectConversion[numNonPartitionColumns]; List<ObjectInspector> objectInspectors = new ArrayList<>(hiveConversions.length); for (int i = 0; i < numNonPartitionColumns; i++) { - DataType dataType = LegacyTypeInfoDataTypeConverter.toDataType(rowTypeInfo.getTypeAt(i)); - ObjectInspector objectInspector = HiveInspectors.getObjectInspector(dataType); + ObjectInspector objectInspector = HiveInspectors.getObjectInspector(fieldTypes[i]); objectInspectors.add(objectInspector); - hiveConversions[i] = HiveInspectors.getConversion(objectInspector, dataType.getLogicalType()); + hiveConversions[i] = HiveInspectors.getConversion(objectInspector, fieldTypes[i].getLogicalType()); } if (!isPartitioned) { rowObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector( - Arrays.asList(rowTypeInfo.getFieldNames()), + Arrays.asList(fieldNames), objectInspectors); } else { rowObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector( - Arrays.asList(rowTypeInfo.getFieldNames()).subList(0, rowTypeInfo.getArity() - partitionColumns.size()), + Arrays.asList(fieldNames).subList(0, fieldNames.length - partitionColumns.size()), objectInspectors); defaultPartitionName = jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname, HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal); diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java index a36479d..2974de0 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java @@ -20,7 +20,6 @@ package org.apache.flink.connectors.hive; import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.ObjectPath; @@ -32,6 +31,7 @@ import org.apache.flink.table.sinks.OutputFormatTableSink; import org.apache.flink.table.sinks.OverwritableTableSink; import org.apache.flink.table.sinks.PartitionableTableSink; import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.types.DataType; import org.apache.flink.types.Row; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; @@ -64,7 +64,7 @@ public class HiveTableSink extends OutputFormatTableSink<Row> implements Partiti private final JobConf jobConf; private final CatalogTable catalogTable; private final ObjectPath tablePath; - private final RowTypeInfo rowTypeInfo; + private final TableSchema tableSchema; private final String hiveVersion; private Map<String, String> staticPartitionSpec = Collections.emptyMap(); @@ -77,8 +77,7 @@ public class HiveTableSink extends OutputFormatTableSink<Row> implements Partiti this.catalogTable = table; hiveVersion = Preconditions.checkNotNull(jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION), "Hive version is not defined"); - TableSchema tableSchema = table.getSchema(); - rowTypeInfo = new RowTypeInfo(tableSchema.getFieldTypes(), tableSchema.getFieldNames()); + tableSchema = table.getSchema(); } @Override @@ -136,18 +135,13 @@ public class HiveTableSink extends OutputFormatTableSink<Row> implements Partiti } @Override - public String[] getFieldNames() { - return rowTypeInfo.getFieldNames(); + public DataType getConsumedDataType() { + return getTableSchema().toRowDataType(); } @Override - public TypeInformation<?>[] getFieldTypes() { - return rowTypeInfo.getFieldTypes(); - } - - @Override - public TypeInformation<Row> getOutputType() { - return rowTypeInfo; + public TableSchema getTableSchema() { + return tableSchema; } // get a staging dir associated with a final dir diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java index 0563029..451bd93 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java @@ -21,7 +21,6 @@ package org.apache.flink.connectors.hive; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.ObjectPath; @@ -107,12 +106,7 @@ public class HiveTableSource extends InputFormatTableSource<Row> implements Part @Override public DataType getProducedDataType() { - TableSchema tableSchema = catalogTable.getSchema(); - DataTypes.Field[] fields = new DataTypes.Field[tableSchema.getFieldCount()]; - for (int i = 0; i < fields.length; i++) { - fields[i] = DataTypes.FIELD(tableSchema.getFieldName(i).get(), tableSchema.getFieldDataType(i).get()); - } - return DataTypes.ROW(fields); + return getTableSchema().toRowDataType(); } @Override diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java index 6d82d38..3546668 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java @@ -203,7 +203,7 @@ public class HiveInspectors { } else if (inspector instanceof HiveVarcharObjectInspector) { return o -> new HiveVarchar((String) o, ((VarCharType) dataType).getLength()); } else if (inspector instanceof HiveDecimalObjectInspector) { - return o -> HiveDecimal.create((BigDecimal) o); + return o -> o == null ? null : HiveDecimal.create((BigDecimal) o); } } diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java index e8c402a..0845eae 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java @@ -138,6 +138,33 @@ public class TableEnvHiveConnectorTest { hiveShell.execute("drop database db1 cascade"); } + @Test + public void testDecimal() throws Exception { + hiveShell.execute("create database db1"); + try { + hiveShell.execute("create table db1.src1 (x decimal(10,2))"); + hiveShell.execute("create table db1.src2 (x decimal(10,2))"); + hiveShell.execute("create table db1.dest (x decimal(10,2))"); + // populate src1 from Hive + hiveShell.execute("insert into db1.src1 values (1.0),(2.12),(5.123),(5.456),(123456789.12)"); + + TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); + // populate src2 with same data from Flink + tableEnv.sqlUpdate("insert into db1.src2 values (cast(1.0 as decimal(10,2))), (cast(2.12 as decimal(10,2))), " + + "(cast(5.123 as decimal(10,2))), (cast(5.456 as decimal(10,2))), (cast(123456789.12 as decimal(10,2)))"); + tableEnv.execute("test1"); + // verify src1 and src2 contain same data + verifyHiveQueryResult("select * from db1.src2", hiveShell.executeQuery("select * from db1.src1")); + + // populate dest with src1 from Flink -- to test reading decimal type from Hive + tableEnv.sqlUpdate("insert into db1.dest select * from db1.src1"); + tableEnv.execute("test2"); + verifyHiveQueryResult("select * from db1.dest", hiveShell.executeQuery("select * from db1.src1")); + } finally { + hiveShell.execute("drop database db1 cascade"); + } + } + private TableEnvironment getTableEnvWithHiveCatalog() { TableEnvironment tableEnv = HiveTestUtils.createTableEnv(); tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog); @@ -146,6 +173,8 @@ public class TableEnvHiveConnectorTest { } private void verifyHiveQueryResult(String query, List<String> expected) { - assertEquals(new HashSet<>(expected), new HashSet<>(hiveShell.executeQuery(query))); + List<String> results = hiveShell.executeQuery(query); + assertEquals(expected.size(), results.size()); + assertEquals(new HashSet<>(expected), new HashSet<>(results)); } }