This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e59a9d  [FLINK-13157]reeanble unit test read complext type of 
HiveInputFormatTest
0e59a9d is described below

commit 0e59a9d5f6585999f36e222ed38b63d82e81ce38
Author: zjuwangg <zjuwa...@foxmail.com>
AuthorDate: Tue Jul 9 18:30:07 2019 +0800

    [FLINK-13157]reeanble unit test read complext type of HiveInputFormatTest
    
    This closes #9037.
---
 .../functions/hive/conversion/HiveInspectors.java  |   8 +-
 .../batch/connectors/hive/HiveInputFormatTest.java | 106 +++++++++++----------
 2 files changed, 58 insertions(+), 56 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
index 2ef5ee0..6d82d38 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
@@ -49,6 +49,7 @@ import 
org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
@@ -338,18 +339,17 @@ public class HiveInspectors {
 
                        Row row = new Row(fields.size());
                        // StandardStructObjectInspector.getStructFieldData in 
Hive-1.2.1 only accepts array or list as data
-                       if (!data.getClass().isArray() && !(data instanceof 
List)) {
+                       if (!data.getClass().isArray() && !(data instanceof 
List) && (inspector instanceof StandardStructObjectInspector)) {
                                data = new Object[]{data};
                        }
                        for (int i = 0; i < row.getArity(); i++) {
                                row.setField(
                                        i,
                                        toFlinkObject(
-                                               
fields.get(i).getFieldObjectInspector(),
-                                               
structInspector.getStructFieldData(data, fields.get(i)))
+                                                       
fields.get(i).getFieldObjectInspector(),
+                                                       
structInspector.getStructFieldData(data, fields.get(i)))
                                );
                        }
-
                        return row;
                }
 
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java
index 5d4d302..fea1469 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java
@@ -23,19 +23,23 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.catalog.hive.HiveTestUtils;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
 import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.Row;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.mapred.JobConf;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -102,8 +106,8 @@ public class HiveInputFormatTest {
                sd.setSerdeInfo(new SerDeInfo());
                
sd.getSerdeInfo().setSerializationLib(DEFAULT_HIVE_INPUT_FORMAT_TEST_SERDE_CLASS);
                sd.getSerdeInfo().setParameters(new HashMap<>());
-               sd.getSerdeInfo().getParameters().put("serialization.format", 
"1");
-               sd.getSerdeInfo().getParameters().put("field.delim", ",");
+               
sd.getSerdeInfo().getParameters().put(serdeConstants.SERIALIZATION_FORMAT, "1");
+               
sd.getSerdeInfo().getParameters().put(serdeConstants.FIELD_DELIM, ",");
                sd.setCols(HiveTableUtil.createHiveColumns(tableSchema));
                tbl.setSd(sd);
                tbl.setPartitionKeys(new ArrayList<>());
@@ -125,54 +129,52 @@ public class HiveInputFormatTest {
                Assert.assertEquals("4,4,a,4000,4.44", rows.get(3).toString());
        }
 
-//     @Test
-//     public void testReadComplextDataTypeFromHiveInputFormat() throws 
Exception {
-//             final String dbName = "default";
-//             final String tblName = "complext_test";
-//
-//             TableSchema.Builder builder = new TableSchema.Builder();
-//             builder.fields(new String[]{"a", "m", "s"}, new DataType[]{
-//                             DataTypes.ARRAY(DataTypes.INT()),
-//                             DataTypes.MAP(DataTypes.INT(), 
DataTypes.STRING()),
-//                             DataTypes.ROW(DataTypes.FIELD("f1", 
DataTypes.INT()), DataTypes.FIELD("f2", DataTypes.STRING()))});
-//
-//             //Now we used metaStore client to create hive table instead of 
using hiveCatalog for it doesn't support set
-//             //serDe temporarily.
-//             HiveMetastoreClientWrapper client = 
HiveMetastoreClientFactory.create(hiveConf, null);
-//             org.apache.hadoop.hive.metastore.api.Table tbl = new 
org.apache.hadoop.hive.metastore.api.Table();
-//             tbl.setDbName(dbName);
-//             tbl.setTableName(tblName);
-//             tbl.setCreateTime((int) (System.currentTimeMillis() / 1000));
-//             tbl.setParameters(new HashMap<>());
-//             StorageDescriptor sd = new StorageDescriptor();
-//             String location = 
HiveInputFormatTest.class.getResource("/complex_test").getPath();
-//             sd.setLocation(location);
-//             
sd.setInputFormat(DEFAULT_HIVE_INPUT_FORMAT_TEST_INPUT_FORMAT_CLASS);
-//             sd.setOutputFormat(DEFAULT_OUTPUT_FORMAT_CLASS);
-//             sd.setSerdeInfo(new SerDeInfo());
-//             
sd.getSerdeInfo().setSerializationLib(DEFAULT_HIVE_INPUT_FORMAT_TEST_SERDE_CLASS);
-//             sd.getSerdeInfo().setParameters(new HashMap<>());
-//             sd.getSerdeInfo().getParameters().put("serialization.format", 
"1");
-//             sd.getSerdeInfo().getParameters().put("field.delim", ";");
-//             //org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe use 
'colelction.delim' as a delimiter config key
-//             // it may be a typo of this class
-//             sd.getSerdeInfo().getParameters().put("colelction.delim", ",");
-//             sd.getSerdeInfo().getParameters().put("mapkey.delim", ":");
-//             sd.setCols(HiveTableUtil.createHiveColumns(builder.build()));
-//             tbl.setSd(sd);
-//             tbl.setPartitionKeys(new ArrayList<>());
-//
-//             client.createTable(tbl);
-//             ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-//             env.setParallelism(1);
-//             RowTypeInfo rowTypeInfo = new 
RowTypeInfo(builder.build().getFieldTypes(), builder.build().getFieldNames());
-//             List<HiveTablePartition> partitions = new ArrayList<>();
-//             partitions.add(new HiveTablePartition(sd, new HashMap<>()));
-//             HiveTableInputFormat hiveTableInputFormat =
-//                     new HiveTableInputFormat(new JobConf(hiveConf), 
hiveCatalog., partitions);
-//             DataSet<Row> rowDataSet = env.createInput(hiveTableInputFormat);
-//             List<Row> rows = rowDataSet.collect();
-//             Assert.assertEquals(1, rows.size());
-//             Assert.assertEquals("[1, 2, 3],{1=a, 2=b},3,c", 
rows.get(0).toString());
-//     }
+       @Test
+       public void testReadComplextDataTypeFromHiveInputFormat() throws 
Exception {
+               final String dbName = "default";
+               final String tblName = "complext_test";
+
+               TableSchema.Builder builder = new TableSchema.Builder();
+               builder.fields(new String[]{"a", "m", "s"}, new DataType[]{
+                               DataTypes.ARRAY(DataTypes.INT()),
+                               DataTypes.MAP(DataTypes.INT(), 
DataTypes.STRING()),
+                               DataTypes.ROW(DataTypes.FIELD("f1", 
DataTypes.INT()), DataTypes.FIELD("f2", DataTypes.STRING()))});
+
+               //Now we used metaStore client to create hive table instead of 
using hiveCatalog for it doesn't support set
+               //serDe temporarily.
+               HiveMetastoreClientWrapper client = 
HiveMetastoreClientFactory.create(hiveConf, null);
+               org.apache.hadoop.hive.metastore.api.Table tbl = new 
org.apache.hadoop.hive.metastore.api.Table();
+               tbl.setDbName(dbName);
+               tbl.setTableName(tblName);
+               tbl.setCreateTime((int) (System.currentTimeMillis() / 1000));
+               tbl.setParameters(new HashMap<>());
+               StorageDescriptor sd = new StorageDescriptor();
+               String location = 
HiveInputFormatTest.class.getResource("/complex_test").getPath();
+               sd.setLocation(location);
+               
sd.setInputFormat(DEFAULT_HIVE_INPUT_FORMAT_TEST_INPUT_FORMAT_CLASS);
+               sd.setOutputFormat(DEFAULT_OUTPUT_FORMAT_CLASS);
+               sd.setSerdeInfo(new SerDeInfo());
+               
sd.getSerdeInfo().setSerializationLib(DEFAULT_HIVE_INPUT_FORMAT_TEST_SERDE_CLASS);
+               sd.getSerdeInfo().setParameters(new HashMap<>());
+               
sd.getSerdeInfo().getParameters().put(serdeConstants.SERIALIZATION_FORMAT, "1");
+               
sd.getSerdeInfo().getParameters().put(serdeConstants.FIELD_DELIM, ";");
+               
sd.getSerdeInfo().getParameters().put(serdeConstants.COLLECTION_DELIM, ",");
+               
sd.getSerdeInfo().getParameters().put(serdeConstants.MAPKEY_DELIM, ":");
+               sd.setCols(HiveTableUtil.createHiveColumns(builder.build()));
+               tbl.setSd(sd);
+               tbl.setPartitionKeys(new ArrayList<>());
+
+               client.createTable(tbl);
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(1);
+               List<HiveTablePartition> partitions = new ArrayList<>();
+               partitions.add(new HiveTablePartition(sd, new HashMap<>()));
+               CatalogTable catalogTable = new 
CatalogTableImpl(builder.build(), new HashMap<>(), "TEST_TABLE");
+               HiveTableInputFormat hiveTableInputFormat =
+                       new HiveTableInputFormat(new JobConf(hiveConf), 
catalogTable, partitions);
+               DataSet<Row> rowDataSet = env.createInput(hiveTableInputFormat);
+               List<Row> rows = rowDataSet.collect();
+               Assert.assertEquals(1, rows.size());
+               Assert.assertEquals("[1, 2, 3],{1=a, 2=b},3,c", 
rows.get(0).toString());
+       }
 }

Reply via email to