This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 0e59a9d [FLINK-13157]reeanble unit test read complext type of HiveInputFormatTest 0e59a9d is described below commit 0e59a9d5f6585999f36e222ed38b63d82e81ce38 Author: zjuwangg <zjuwa...@foxmail.com> AuthorDate: Tue Jul 9 18:30:07 2019 +0800 [FLINK-13157]reeanble unit test read complext type of HiveInputFormatTest This closes #9037. --- .../functions/hive/conversion/HiveInspectors.java | 8 +- .../batch/connectors/hive/HiveInputFormatTest.java | 106 +++++++++++---------- 2 files changed, 58 insertions(+), 56 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java index 2ef5ee0..6d82d38 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector; @@ -338,18 +339,17 @@ public class HiveInspectors { Row row = new Row(fields.size()); // StandardStructObjectInspector.getStructFieldData in Hive-1.2.1 only accepts array or list as data - if (!data.getClass().isArray() && !(data instanceof List)) { + if (!data.getClass().isArray() && !(data instanceof List) && (inspector instanceof StandardStructObjectInspector)) { data = new Object[]{data}; } for (int i = 0; i < row.getArity(); i++) { row.setField( i, toFlinkObject( - fields.get(i).getFieldObjectInspector(), - structInspector.getStructFieldData(data, fields.get(i))) + fields.get(i).getFieldObjectInspector(), + structInspector.getStructFieldData(data, fields.get(i))) ); } - return row; } diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java index 5d4d302..fea1469 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java @@ -23,19 +23,23 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CatalogTableImpl; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.table.catalog.hive.HiveTestUtils; import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory; import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper; import org.apache.flink.table.catalog.hive.util.HiveTableUtil; +import org.apache.flink.table.types.DataType; import org.apache.flink.types.Row; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.mapred.JobConf; import org.junit.AfterClass; import org.junit.Assert; @@ -102,8 +106,8 @@ public class HiveInputFormatTest { sd.setSerdeInfo(new SerDeInfo()); sd.getSerdeInfo().setSerializationLib(DEFAULT_HIVE_INPUT_FORMAT_TEST_SERDE_CLASS); sd.getSerdeInfo().setParameters(new HashMap<>()); - sd.getSerdeInfo().getParameters().put("serialization.format", "1"); - sd.getSerdeInfo().getParameters().put("field.delim", ","); + sd.getSerdeInfo().getParameters().put(serdeConstants.SERIALIZATION_FORMAT, "1"); + sd.getSerdeInfo().getParameters().put(serdeConstants.FIELD_DELIM, ","); sd.setCols(HiveTableUtil.createHiveColumns(tableSchema)); tbl.setSd(sd); tbl.setPartitionKeys(new ArrayList<>()); @@ -125,54 +129,52 @@ public class HiveInputFormatTest { Assert.assertEquals("4,4,a,4000,4.44", rows.get(3).toString()); } -// @Test -// public void testReadComplextDataTypeFromHiveInputFormat() throws Exception { -// final String dbName = "default"; -// final String tblName = "complext_test"; -// -// TableSchema.Builder builder = new TableSchema.Builder(); -// builder.fields(new String[]{"a", "m", "s"}, new DataType[]{ -// DataTypes.ARRAY(DataTypes.INT()), -// DataTypes.MAP(DataTypes.INT(), DataTypes.STRING()), -// DataTypes.ROW(DataTypes.FIELD("f1", DataTypes.INT()), DataTypes.FIELD("f2", DataTypes.STRING()))}); -// -// //Now we used metaStore client to create hive table instead of using hiveCatalog for it doesn't support set -// //serDe temporarily. -// HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(hiveConf, null); -// org.apache.hadoop.hive.metastore.api.Table tbl = new org.apache.hadoop.hive.metastore.api.Table(); -// tbl.setDbName(dbName); -// tbl.setTableName(tblName); -// tbl.setCreateTime((int) (System.currentTimeMillis() / 1000)); -// tbl.setParameters(new HashMap<>()); -// StorageDescriptor sd = new StorageDescriptor(); -// String location = HiveInputFormatTest.class.getResource("/complex_test").getPath(); -// sd.setLocation(location); -// sd.setInputFormat(DEFAULT_HIVE_INPUT_FORMAT_TEST_INPUT_FORMAT_CLASS); -// sd.setOutputFormat(DEFAULT_OUTPUT_FORMAT_CLASS); -// sd.setSerdeInfo(new SerDeInfo()); -// sd.getSerdeInfo().setSerializationLib(DEFAULT_HIVE_INPUT_FORMAT_TEST_SERDE_CLASS); -// sd.getSerdeInfo().setParameters(new HashMap<>()); -// sd.getSerdeInfo().getParameters().put("serialization.format", "1"); -// sd.getSerdeInfo().getParameters().put("field.delim", ";"); -// //org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe use 'colelction.delim' as a delimiter config key -// // it may be a typo of this class -// sd.getSerdeInfo().getParameters().put("colelction.delim", ","); -// sd.getSerdeInfo().getParameters().put("mapkey.delim", ":"); -// sd.setCols(HiveTableUtil.createHiveColumns(builder.build())); -// tbl.setSd(sd); -// tbl.setPartitionKeys(new ArrayList<>()); -// -// client.createTable(tbl); -// ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); -// env.setParallelism(1); -// RowTypeInfo rowTypeInfo = new RowTypeInfo(builder.build().getFieldTypes(), builder.build().getFieldNames()); -// List<HiveTablePartition> partitions = new ArrayList<>(); -// partitions.add(new HiveTablePartition(sd, new HashMap<>())); -// HiveTableInputFormat hiveTableInputFormat = -// new HiveTableInputFormat(new JobConf(hiveConf), hiveCatalog., partitions); -// DataSet<Row> rowDataSet = env.createInput(hiveTableInputFormat); -// List<Row> rows = rowDataSet.collect(); -// Assert.assertEquals(1, rows.size()); -// Assert.assertEquals("[1, 2, 3],{1=a, 2=b},3,c", rows.get(0).toString()); -// } + @Test + public void testReadComplextDataTypeFromHiveInputFormat() throws Exception { + final String dbName = "default"; + final String tblName = "complext_test"; + + TableSchema.Builder builder = new TableSchema.Builder(); + builder.fields(new String[]{"a", "m", "s"}, new DataType[]{ + DataTypes.ARRAY(DataTypes.INT()), + DataTypes.MAP(DataTypes.INT(), DataTypes.STRING()), + DataTypes.ROW(DataTypes.FIELD("f1", DataTypes.INT()), DataTypes.FIELD("f2", DataTypes.STRING()))}); + + //Now we used metaStore client to create hive table instead of using hiveCatalog for it doesn't support set + //serDe temporarily. + HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(hiveConf, null); + org.apache.hadoop.hive.metastore.api.Table tbl = new org.apache.hadoop.hive.metastore.api.Table(); + tbl.setDbName(dbName); + tbl.setTableName(tblName); + tbl.setCreateTime((int) (System.currentTimeMillis() / 1000)); + tbl.setParameters(new HashMap<>()); + StorageDescriptor sd = new StorageDescriptor(); + String location = HiveInputFormatTest.class.getResource("/complex_test").getPath(); + sd.setLocation(location); + sd.setInputFormat(DEFAULT_HIVE_INPUT_FORMAT_TEST_INPUT_FORMAT_CLASS); + sd.setOutputFormat(DEFAULT_OUTPUT_FORMAT_CLASS); + sd.setSerdeInfo(new SerDeInfo()); + sd.getSerdeInfo().setSerializationLib(DEFAULT_HIVE_INPUT_FORMAT_TEST_SERDE_CLASS); + sd.getSerdeInfo().setParameters(new HashMap<>()); + sd.getSerdeInfo().getParameters().put(serdeConstants.SERIALIZATION_FORMAT, "1"); + sd.getSerdeInfo().getParameters().put(serdeConstants.FIELD_DELIM, ";"); + sd.getSerdeInfo().getParameters().put(serdeConstants.COLLECTION_DELIM, ","); + sd.getSerdeInfo().getParameters().put(serdeConstants.MAPKEY_DELIM, ":"); + sd.setCols(HiveTableUtil.createHiveColumns(builder.build())); + tbl.setSd(sd); + tbl.setPartitionKeys(new ArrayList<>()); + + client.createTable(tbl); + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + List<HiveTablePartition> partitions = new ArrayList<>(); + partitions.add(new HiveTablePartition(sd, new HashMap<>())); + CatalogTable catalogTable = new CatalogTableImpl(builder.build(), new HashMap<>(), "TEST_TABLE"); + HiveTableInputFormat hiveTableInputFormat = + new HiveTableInputFormat(new JobConf(hiveConf), catalogTable, partitions); + DataSet<Row> rowDataSet = env.createInput(hiveTableInputFormat); + List<Row> rows = rowDataSet.collect(); + Assert.assertEquals(1, rows.size()); + Assert.assertEquals("[1, 2, 3],{1=a, 2=b},3,c", rows.get(0).toString()); + } }