spark git commit: [SPARK-16632][SQL] Respect Hive schema when merging parquet schema.

2016-07-19 Thread lian
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 6f209c8fa -> c2b5b3ca5


[SPARK-16632][SQL] Respect Hive schema when merging parquet schema.

When Hive (or at least certain versions of Hive) creates parquet files
containing tinyint or smallint columns, it stores them as int32, but
doesn't annotate the parquet field as containing the corresponding
int8 / int16 data. When Spark reads those files using the vectorized
reader, it follows the parquet schema for these fields, but when
actually reading the data it tries to use the type fetched from
the metastore, and then fails because data has been loaded into the
wrong fields in OnHeapColumnVector.

So instead of blindly trusting the parquet schema, check whether the
Catalyst-provided schema disagrees with it, and adjust the types so
that the necessary metadata is present when loading the data into
the ColumnVector instance.

Tested with unit tests and with tests that create byte / short columns
in Hive and try to read them from Spark.

Author: Marcelo Vanzin 

Closes #14272 from vanzin/SPARK-16632.

(cherry picked from commit 75146be6ba5e9f559f5f15430310bb476ee0812c)
Signed-off-by: Cheng Lian 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c2b5b3ca
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c2b5b3ca
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c2b5b3ca

Branch: refs/heads/branch-2.0
Commit: c2b5b3ca538aaaef946653e60bd68e38c58dc41f
Parents: 6f209c8
Author: Marcelo Vanzin 
Authored: Wed Jul 20 13:00:22 2016 +0800
Committer: Cheng Lian 
Committed: Wed Jul 20 13:49:45 2016 +0800

--
 .../parquet/ParquetReadSupport.scala| 18 +
 .../parquet/ParquetSchemaSuite.scala| 39 
 2 files changed, 57 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c2b5b3ca/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
index 12f4974..1628e4c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
@@ -26,6 +26,8 @@ import org.apache.parquet.hadoop.api.{InitContext, 
ReadSupport}
 import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
 import org.apache.parquet.io.api.RecordMaterializer
 import org.apache.parquet.schema._
+import org.apache.parquet.schema.OriginalType._
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
 import org.apache.parquet.schema.Type.Repetition
 
 import org.apache.spark.internal.Logging
@@ -116,6 +118,12 @@ private[parquet] object ParquetReadSupport {
   }
 
   private def clipParquetType(parquetType: Type, catalystType: DataType): Type 
= {
+val primName = if (parquetType.isPrimitive()) {
+  parquetType.asPrimitiveType().getPrimitiveTypeName()
+} else {
+  null
+}
+
 catalystType match {
   case t: ArrayType if !isPrimitiveCatalystType(t.elementType) =>
 // Only clips array types with nested type as element type.
@@ -130,6 +138,16 @@ private[parquet] object ParquetReadSupport {
   case t: StructType =>
 clipParquetGroup(parquetType.asGroupType(), t)
 
+  case _: ByteType if primName == INT32 =>
+// SPARK-16632: Handle case where Hive stores bytes in a int32 field 
without specifying
+// the original type.
+Types.primitive(INT32, 
parquetType.getRepetition()).as(INT_8).named(parquetType.getName())
+
+  case _: ShortType if primName == INT32 =>
+// SPARK-16632: Handle case where Hive stores shorts in a int32 field 
without specifying
+// the original type.
+Types.primitive(INT32, 
parquetType.getRepetition()).as(INT_16).named(parquetType.getName())
+
   case _ =>
 // UDTs and primitive types are not clipped.  For UDTs, a clipped 
version might not be able
 // to be mapped to desired user-space types.  So UDTs shouldn't 
participate schema merging.

http://git-wip-us.apache.org/repos/asf/spark/blob/c2b5b3ca/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index 51bb236..215c138 100644

spark git commit: [SPARK-16632][SQL] Respect Hive schema when merging parquet schema.

2016-07-19 Thread lian
Repository: spark
Updated Branches:
  refs/heads/master fc2326362 -> 75146be6b


[SPARK-16632][SQL] Respect Hive schema when merging parquet schema.

When Hive (or at least certain versions of Hive) creates parquet files
containing tinyint or smallint columns, it stores them as int32, but
doesn't annotate the parquet field as containing the corresponding
int8 / int16 data. When Spark reads those files using the vectorized
reader, it follows the parquet schema for these fields, but when
actually reading the data it tries to use the type fetched from
the metastore, and then fails because data has been loaded into the
wrong fields in OnHeapColumnVector.

So instead of blindly trusting the parquet schema, check whether the
Catalyst-provided schema disagrees with it, and adjust the types so
that the necessary metadata is present when loading the data into
the ColumnVector instance.

Tested with unit tests and with tests that create byte / short columns
in Hive and try to read them from Spark.

Author: Marcelo Vanzin 

Closes #14272 from vanzin/SPARK-16632.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/75146be6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/75146be6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/75146be6

Branch: refs/heads/master
Commit: 75146be6ba5e9f559f5f15430310bb476ee0812c
Parents: fc23263
Author: Marcelo Vanzin 
Authored: Wed Jul 20 13:00:22 2016 +0800
Committer: Cheng Lian 
Committed: Wed Jul 20 13:00:22 2016 +0800

--
 .../parquet/ParquetReadSupport.scala| 18 +
 .../parquet/ParquetSchemaSuite.scala| 39 
 2 files changed, 57 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/75146be6/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
index e6ef634..46d786d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
@@ -26,6 +26,8 @@ import org.apache.parquet.hadoop.api.{InitContext, 
ReadSupport}
 import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
 import org.apache.parquet.io.api.RecordMaterializer
 import org.apache.parquet.schema._
+import org.apache.parquet.schema.OriginalType._
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
 import org.apache.parquet.schema.Type.Repetition
 
 import org.apache.spark.internal.Logging
@@ -120,6 +122,12 @@ private[parquet] object ParquetReadSupport {
   }
 
   private def clipParquetType(parquetType: Type, catalystType: DataType): Type 
= {
+val primName = if (parquetType.isPrimitive()) {
+  parquetType.asPrimitiveType().getPrimitiveTypeName()
+} else {
+  null
+}
+
 catalystType match {
   case t: ArrayType if !isPrimitiveCatalystType(t.elementType) =>
 // Only clips array types with nested type as element type.
@@ -134,6 +142,16 @@ private[parquet] object ParquetReadSupport {
   case t: StructType =>
 clipParquetGroup(parquetType.asGroupType(), t)
 
+  case _: ByteType if primName == INT32 =>
+// SPARK-16632: Handle case where Hive stores bytes in a int32 field 
without specifying
+// the original type.
+Types.primitive(INT32, 
parquetType.getRepetition()).as(INT_8).named(parquetType.getName())
+
+  case _: ShortType if primName == INT32 =>
+// SPARK-16632: Handle case where Hive stores shorts in a int32 field 
without specifying
+// the original type.
+Types.primitive(INT32, 
parquetType.getRepetition()).as(INT_16).named(parquetType.getName())
+
   case _ =>
 // UDTs and primitive types are not clipped.  For UDTs, a clipped 
version might not be able
 // to be mapped to desired user-space types.  So UDTs shouldn't 
participate schema merging.

http://git-wip-us.apache.org/repos/asf/spark/blob/75146be6/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index 8a980a7..31ebec0 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.sca