HIVE-12619: Switching the field order within an array of structs causes the query to fail (Mohammad and Jimmy, reviewed by Sergio)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b431c278 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b431c278 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b431c278 Branch: refs/heads/llap Commit: b431c2788cd37fc788acd48beaf31c403361c1f0 Parents: 20a8192 Author: Jimmy Xiang <jxi...@apache.org> Authored: Thu Mar 10 10:32:57 2016 -0800 Committer: Jimmy Xiang <jxi...@apache.org> Committed: Tue Mar 29 19:47:18 2016 -0700 ---------------------------------------------------------------------- .../io/parquet/convert/HiveSchemaConverter.java | 10 +-- .../parquet/read/DataWritableReadSupport.java | 75 ++++++++++++-------- .../ql/io/parquet/serde/ParquetHiveSerDe.java | 11 +-- .../clientpositive/parquet_schema_evolution.q | 14 ++++ .../parquet_map_null.q.java1.8.out | 1 + .../parquet_schema_evolution.q.out | 65 +++++++++++++++++ .../clientpositive/parquet_type_promotion.q.out | 2 +- 7 files changed, 131 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/b431c278/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveSchemaConverter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveSchemaConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveSchemaConverter.java index b01f21f..40f6256 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveSchemaConverter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveSchemaConverter.java @@ -24,12 +24,10 @@ import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; - import org.apache.parquet.schema.ConversionPatterns; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.OriginalType; -import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; import org.apache.parquet.schema.Type; import org.apache.parquet.schema.Type.Repetition; @@ -120,9 +118,10 @@ public class HiveSchemaConverter { // An optional group containing a repeated anonymous group "bag", containing // 1 anonymous element "array_element" + @SuppressWarnings("deprecation") private static GroupType convertArrayType(final String name, final ListTypeInfo typeInfo) { final TypeInfo subType = typeInfo.getListElementTypeInfo(); - return listWrapper(name, OriginalType.LIST, new GroupType(Repetition.REPEATED, + return new GroupType(Repetition.OPTIONAL, name, OriginalType.LIST, new GroupType(Repetition.REPEATED, ParquetHiveSerDe.ARRAY.toString(), convertType("array_element", subType))); } @@ -143,9 +142,4 @@ public class HiveSchemaConverter { typeInfo.getMapValueTypeInfo()); return ConversionPatterns.mapType(Repetition.OPTIONAL, name, keyType, valueType); } - - private static GroupType listWrapper(final String name, final OriginalType originalType, - final GroupType groupType) { - return new GroupType(Repetition.OPTIONAL, name, originalType, groupType); - } } http://git-wip-us.apache.org/repos/asf/hive/blob/b431c278/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java index 53f3b72..3e38cc7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java @@ -23,24 +23,28 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.parquet.convert.DataWritableRecordConverter; +import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.Text; import org.apache.hadoop.util.StringUtils; - import org.apache.parquet.hadoop.api.InitContext; import org.apache.parquet.hadoop.api.ReadSupport; import org.apache.parquet.io.api.RecordMaterializer; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Type.Repetition; import org.apache.parquet.schema.Types; -import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; /** * @@ -107,43 +111,58 @@ public class DataWritableReadSupport extends ReadSupport<ArrayWritable> { private static List<Type> getProjectedGroupFields(GroupType schema, List<String> colNames, List<TypeInfo> colTypes) { List<Type> schemaTypes = new ArrayList<Type>(); - ListIterator columnIterator = colNames.listIterator(); + ListIterator<String> columnIterator = colNames.listIterator(); while (columnIterator.hasNext()) { TypeInfo colType = colTypes.get(columnIterator.nextIndex()); - String colName = (String) columnIterator.next(); + String colName = columnIterator.next(); Type fieldType = getFieldTypeIgnoreCase(schema, colName); - if (fieldType != null) { - if (colType.getCategory() == ObjectInspector.Category.STRUCT) { - if (fieldType.isPrimitive()) { - throw new IllegalStateException("Invalid schema data type, found: PRIMITIVE, expected: STRUCT"); - } - - GroupType groupFieldType = fieldType.asGroupType(); - - List<Type> groupFields = getProjectedGroupFields( - groupFieldType, - ((StructTypeInfo) colType).getAllStructFieldNames(), - ((StructTypeInfo) colType).getAllStructFieldTypeInfos() - ); - - Type[] typesArray = groupFields.toArray(new Type[0]); - schemaTypes.add(Types.buildGroup(groupFieldType.getRepetition()) - .addFields(typesArray) - .named(fieldType.getName()) - ); - } else { - schemaTypes.add(fieldType); - } - } else { - // Add type for schema evolution + if (fieldType == null) { schemaTypes.add(Types.optional(PrimitiveTypeName.BINARY).named(colName)); + } else { + schemaTypes.add(getProjectedType(colType, fieldType)); } } return schemaTypes; } + private static Type getProjectedType(TypeInfo colType, Type fieldType) { + switch (colType.getCategory()) { + case STRUCT: + List<Type> groupFields = getProjectedGroupFields( + fieldType.asGroupType(), + ((StructTypeInfo) colType).getAllStructFieldNames(), + ((StructTypeInfo) colType).getAllStructFieldTypeInfos() + ); + + Type[] typesArray = groupFields.toArray(new Type[0]); + return Types.buildGroup(fieldType.getRepetition()) + .addFields(typesArray) + .named(fieldType.getName()); + case LIST: + TypeInfo elemType = ((ListTypeInfo) colType).getListElementTypeInfo(); + if (elemType.getCategory() == ObjectInspector.Category.STRUCT) { + Type subFieldType = fieldType.asGroupType().getType(0); + if (!subFieldType.isPrimitive()) { + String subFieldName = subFieldType.getName(); + Text name = new Text(subFieldName); + if (name.equals(ParquetHiveSerDe.ARRAY) || name.equals(ParquetHiveSerDe.LIST)) { + subFieldType = new GroupType(Repetition.REPEATED, subFieldName, + getProjectedType(elemType, subFieldType.asGroupType().getType(0))); + } else { + subFieldType = getProjectedType(elemType, subFieldType); + } + return Types.buildGroup(Repetition.OPTIONAL).as(OriginalType.LIST).addFields( + subFieldType).named(fieldType.getName()); + } + } + break; + default: + } + return fieldType; + } + /** * Searchs column names by name on a given Parquet message schema, and returns its projected * Parquet schema types. http://git-wip-us.apache.org/repos/asf/hive/blob/b431c278/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java index e1bf8e2..995b965 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java @@ -28,7 +28,6 @@ import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; - import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; @@ -37,7 +36,6 @@ import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.parquet.hadoop.ParquetOutputFormat; -import org.apache.parquet.hadoop.ParquetWriter; /** * @@ -51,10 +49,7 @@ public class ParquetHiveSerDe extends AbstractSerDe { public static final Text MAP_VALUE = new Text("value"); public static final Text MAP = new Text("map"); public static final Text ARRAY = new Text("bag"); - - // default compression type for parquet output format - private static final String DEFAULTCOMPRESSION = - ParquetWriter.DEFAULT_COMPRESSION_CODEC_NAME.name(); + public static final Text LIST = new Text("list"); // Map precision to the number bytes needed for binary conversion. public static final int PRECISION_TO_BYTE_COUNT[] = new int[38]; @@ -78,7 +73,6 @@ public class ParquetHiveSerDe extends AbstractSerDe { private LAST_OPERATION status; private long serializedSize; private long deserializedSize; - private String compressionType; private ParquetHiveRecord parquetRow; @@ -97,9 +91,6 @@ public class ParquetHiveSerDe extends AbstractSerDe { final String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS); final String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES); - // Get compression properties - compressionType = tbl.getProperty(ParquetOutputFormat.COMPRESSION, DEFAULTCOMPRESSION); - if (columnNameProperty.length() == 0) { columnNames = new ArrayList<String>(); } else { http://git-wip-us.apache.org/repos/asf/hive/blob/b431c278/ql/src/test/queries/clientpositive/parquet_schema_evolution.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/parquet_schema_evolution.q b/ql/src/test/queries/clientpositive/parquet_schema_evolution.q index 193400f..e767b81 100644 --- a/ql/src/test/queries/clientpositive/parquet_schema_evolution.q +++ b/ql/src/test/queries/clientpositive/parquet_schema_evolution.q @@ -24,5 +24,19 @@ CREATE TABLE NewStructFieldTable STORED AS PARQUET AS SELECT * FROM NewStructFie DESCRIBE NewStructFieldTable; SELECT * FROM NewStructFieldTable; +-- test if the order of fields in array<struct<>> changes, it works fine + +DROP TABLE IF EXISTS schema_test; +CREATE TABLE schema_test (msg array<struct<f1: string, f2: string, a: array<struct<a1: string, a2: string>>, b: array<struct<b1: int, b2: int>>>>) STORED AS PARQUET; +INSERT INTO TABLE schema_test SELECT array(named_struct('f1', 'abc', 'f2', 'abc2', 'a', array(named_struct('a1', 'a1', 'a2', 'a2')), + 'b', array(named_struct('b1', 1, 'b2', 2)))) FROM NewStructField LIMIT 2; +SELECT * FROM schema_test; +set hive.metastore.disallow.incompatible.col.type.changes=false; +-- Order of fields swapped +ALTER TABLE schema_test CHANGE msg msg array<struct<a: array<struct<a2: string, a1: string>>, b: array<struct<b2: int, b1: int>>, f2: string, f1: string>>; +reset hive.metastore.disallow.incompatible.col.type.changes; +SELECT * FROM schema_test; + +DROP TABLE schema_test; DROP TABLE NewStructField; DROP TABLE NewStructFieldTable; http://git-wip-us.apache.org/repos/asf/hive/blob/b431c278/ql/src/test/results/clientpositive/parquet_map_null.q.java1.8.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/parquet_map_null.q.java1.8.out b/ql/src/test/results/clientpositive/parquet_map_null.q.java1.8.out index dd541a5..1462cc2 100644 --- a/ql/src/test/results/clientpositive/parquet_map_null.q.java1.8.out +++ b/ql/src/test/results/clientpositive/parquet_map_null.q.java1.8.out @@ -38,6 +38,7 @@ POSTHOOK: type: CREATETABLE_AS_SELECT POSTHOOK: Input: default@avro_table POSTHOOK: Output: database:default POSTHOOK: Output: default@parquet_table +POSTHOOK: Lineage: parquet_table.avreau_col_1 SIMPLE [(avro_table)avro_table.FieldSchema(name:avreau_col_1, type:map<string,string>, comment:), ] PREHOOK: query: SELECT * FROM parquet_table PREHOOK: type: QUERY PREHOOK: Input: default@parquet_table http://git-wip-us.apache.org/repos/asf/hive/blob/b431c278/ql/src/test/results/clientpositive/parquet_schema_evolution.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/parquet_schema_evolution.q.out b/ql/src/test/results/clientpositive/parquet_schema_evolution.q.out index 0b88d84..07595d2 100644 --- a/ql/src/test/results/clientpositive/parquet_schema_evolution.q.out +++ b/ql/src/test/results/clientpositive/parquet_schema_evolution.q.out @@ -125,6 +125,71 @@ POSTHOOK: Input: default@newstructfieldtable {"a1":{"k1":"v1"},"a2":{"e1":5,"e2":null},"a3":null} NULL {"a1":{"k1":"v1"},"a2":{"e1":5,"e2":null},"a3":null} NULL {"a1":{"k1":"v1"},"a2":{"e1":5,"e2":null},"a3":null} NULL +PREHOOK: query: -- test if the order of fields in array<struct<>> changes, it works fine + +DROP TABLE IF EXISTS schema_test +PREHOOK: type: DROPTABLE +POSTHOOK: query: -- test if the order of fields in array<struct<>> changes, it works fine + +DROP TABLE IF EXISTS schema_test +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE schema_test (msg array<struct<f1: string, f2: string, a: array<struct<a1: string, a2: string>>, b: array<struct<b1: int, b2: int>>>>) STORED AS PARQUET +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@schema_test +POSTHOOK: query: CREATE TABLE schema_test (msg array<struct<f1: string, f2: string, a: array<struct<a1: string, a2: string>>, b: array<struct<b1: int, b2: int>>>>) STORED AS PARQUET +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@schema_test +PREHOOK: query: INSERT INTO TABLE schema_test SELECT array(named_struct('f1', 'abc', 'f2', 'abc2', 'a', array(named_struct('a1', 'a1', 'a2', 'a2')), + 'b', array(named_struct('b1', 1, 'b2', 2)))) FROM NewStructField LIMIT 2 +PREHOOK: type: QUERY +PREHOOK: Input: default@newstructfield +PREHOOK: Output: default@schema_test +POSTHOOK: query: INSERT INTO TABLE schema_test SELECT array(named_struct('f1', 'abc', 'f2', 'abc2', 'a', array(named_struct('a1', 'a1', 'a2', 'a2')), + 'b', array(named_struct('b1', 1, 'b2', 2)))) FROM NewStructField LIMIT 2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@newstructfield +POSTHOOK: Output: default@schema_test +POSTHOOK: Lineage: schema_test.msg EXPRESSION [] +PREHOOK: query: SELECT * FROM schema_test +PREHOOK: type: QUERY +PREHOOK: Input: default@schema_test +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM schema_test +POSTHOOK: type: QUERY +POSTHOOK: Input: default@schema_test +#### A masked pattern was here #### +[{"f1":"abc","f2":"abc2","a":[{"a1":"a1","a2":"a2"}],"b":[{"b1":1,"b2":2}]}] +[{"f1":"abc","f2":"abc2","a":[{"a1":"a1","a2":"a2"}],"b":[{"b1":1,"b2":2}]}] +PREHOOK: query: -- Order of fields swapped +ALTER TABLE schema_test CHANGE msg msg array<struct<a: array<struct<a2: string, a1: string>>, b: array<struct<b2: int, b1: int>>, f2: string, f1: string>> +PREHOOK: type: ALTERTABLE_RENAMECOL +PREHOOK: Input: default@schema_test +PREHOOK: Output: default@schema_test +POSTHOOK: query: -- Order of fields swapped +ALTER TABLE schema_test CHANGE msg msg array<struct<a: array<struct<a2: string, a1: string>>, b: array<struct<b2: int, b1: int>>, f2: string, f1: string>> +POSTHOOK: type: ALTERTABLE_RENAMECOL +POSTHOOK: Input: default@schema_test +POSTHOOK: Output: default@schema_test +PREHOOK: query: SELECT * FROM schema_test +PREHOOK: type: QUERY +PREHOOK: Input: default@schema_test +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM schema_test +POSTHOOK: type: QUERY +POSTHOOK: Input: default@schema_test +#### A masked pattern was here #### +[{"a":[{"a2":"a2","a1":"a1"}],"b":[{"b2":2,"b1":1}],"f2":"abc2","f1":"abc"}] +[{"a":[{"a2":"a2","a1":"a1"}],"b":[{"b2":2,"b1":1}],"f2":"abc2","f1":"abc"}] +PREHOOK: query: DROP TABLE schema_test +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@schema_test +PREHOOK: Output: default@schema_test +POSTHOOK: query: DROP TABLE schema_test +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@schema_test +POSTHOOK: Output: default@schema_test PREHOOK: query: DROP TABLE NewStructField PREHOOK: type: DROPTABLE PREHOOK: Input: default@newstructfield http://git-wip-us.apache.org/repos/asf/hive/blob/b431c278/ql/src/test/results/clientpositive/parquet_type_promotion.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/parquet_type_promotion.q.out b/ql/src/test/results/clientpositive/parquet_type_promotion.q.out index 55f9b27..91c3fff 100644 --- a/ql/src/test/results/clientpositive/parquet_type_promotion.q.out +++ b/ql/src/test/results/clientpositive/parquet_type_promotion.q.out @@ -233,7 +233,7 @@ POSTHOOK: query: SELECT * FROM arrays_of_struct_to_map POSTHOOK: type: QUERY POSTHOOK: Input: default@arrays_of_struct_to_map #### A masked pattern was here #### -[{"c1":1}] [{"f2":77}] +[{"c1":1}] [{"f2":88}] PREHOOK: query: -- Testing schema evolution of adding columns into array<struct<>> ALTER TABLE arrays_of_struct_to_map REPLACE COLUMNS (locations1 array<struct<c1:int,c2:int,c3:int>>, locations2 array<struct<f1:int,f2:int,f3:int>>)