Repository: spark
Updated Branches:
  refs/heads/branch-2.0 e033fd50f -> 8cdc0d4da


[SPARK-9876] [BRANCH-2.0] Revert "[SPARK-9876][SQL] Update Parquet to 1.8.1."

## What changes were proposed in this pull request?
Since we are pretty late in the 2.0 release cycle, it is not clear if this 
upgrade can be tested thoroughly and if we can resolve the regression issue 
that we observed before. This PR reverts #13280 from branch 2.0.

## How was this patch tested?
Existing tests

This reverts commit 776d183c82b424ef7c3cae30537d8afe9b9eee83.

Author: Yin Huai <yh...@databricks.com>

Closes #13450 from yhuai/revertParquet1.8.1-branch-2.0.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8cdc0d4d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8cdc0d4d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8cdc0d4d

Branch: refs/heads/branch-2.0
Commit: 8cdc0d4da6324f48e1a25496dc4bc7635d9472ef
Parents: e033fd5
Author: Yin Huai <yh...@databricks.com>
Authored: Wed Jun 1 16:05:13 2016 -0700
Committer: Cheng Lian <l...@databricks.com>
Committed: Wed Jun 1 16:05:13 2016 -0700

----------------------------------------------------------------------
 dev/deps/spark-deps-hadoop-2.2                  | 11 +--
 dev/deps/spark-deps-hadoop-2.3                  | 11 +--
 dev/deps/spark-deps-hadoop-2.4                  | 11 +--
 dev/deps/spark-deps-hadoop-2.6                  | 11 +--
 dev/deps/spark-deps-hadoop-2.7                  | 11 +--
 pom.xml                                         |  2 +-
 .../SpecificParquetRecordReaderBase.java        | 20 ++---
 .../parquet/CatalystReadSupport.scala           | 12 +--
 .../parquet/CatalystSchemaConverter.scala       | 16 ----
 .../datasources/parquet/ParquetFilters.scala    | 83 ++++++++++++++++----
 .../parquet/ParquetSchemaSuite.scala            | 20 ++---
 11 files changed, 117 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8cdc0d4d/dev/deps/spark-deps-hadoop-2.2
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2
index b5c38a6..96001ea 100644
--- a/dev/deps/spark-deps-hadoop-2.2
+++ b/dev/deps/spark-deps-hadoop-2.2
@@ -129,13 +129,14 @@ opencsv-2.3.jar
 oro-2.0.8.jar
 osgi-resource-locator-1.0.1.jar
 paranamer-2.3.jar
-parquet-column-1.8.1.jar
-parquet-common-1.8.1.jar
-parquet-encoding-1.8.1.jar
+parquet-column-1.7.0.jar
+parquet-common-1.7.0.jar
+parquet-encoding-1.7.0.jar
 parquet-format-2.3.0-incubating.jar
-parquet-hadoop-1.8.1.jar
+parquet-generator-1.7.0.jar
+parquet-hadoop-1.7.0.jar
 parquet-hadoop-bundle-1.6.0.jar
-parquet-jackson-1.8.1.jar
+parquet-jackson-1.7.0.jar
 pmml-model-1.2.15.jar
 pmml-schema-1.2.15.jar
 protobuf-java-2.5.0.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/8cdc0d4d/dev/deps/spark-deps-hadoop-2.3
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3
index 969df04..9f3d9ad 100644
--- a/dev/deps/spark-deps-hadoop-2.3
+++ b/dev/deps/spark-deps-hadoop-2.3
@@ -136,13 +136,14 @@ opencsv-2.3.jar
 oro-2.0.8.jar
 osgi-resource-locator-1.0.1.jar
 paranamer-2.3.jar
-parquet-column-1.8.1.jar
-parquet-common-1.8.1.jar
-parquet-encoding-1.8.1.jar
+parquet-column-1.7.0.jar
+parquet-common-1.7.0.jar
+parquet-encoding-1.7.0.jar
 parquet-format-2.3.0-incubating.jar
-parquet-hadoop-1.8.1.jar
+parquet-generator-1.7.0.jar
+parquet-hadoop-1.7.0.jar
 parquet-hadoop-bundle-1.6.0.jar
-parquet-jackson-1.8.1.jar
+parquet-jackson-1.7.0.jar
 pmml-model-1.2.15.jar
 pmml-schema-1.2.15.jar
 protobuf-java-2.5.0.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/8cdc0d4d/dev/deps/spark-deps-hadoop-2.4
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4
index f0491ec..77d5266 100644
--- a/dev/deps/spark-deps-hadoop-2.4
+++ b/dev/deps/spark-deps-hadoop-2.4
@@ -136,13 +136,14 @@ opencsv-2.3.jar
 oro-2.0.8.jar
 osgi-resource-locator-1.0.1.jar
 paranamer-2.3.jar
-parquet-column-1.8.1.jar
-parquet-common-1.8.1.jar
-parquet-encoding-1.8.1.jar
+parquet-column-1.7.0.jar
+parquet-common-1.7.0.jar
+parquet-encoding-1.7.0.jar
 parquet-format-2.3.0-incubating.jar
-parquet-hadoop-1.8.1.jar
+parquet-generator-1.7.0.jar
+parquet-hadoop-1.7.0.jar
 parquet-hadoop-bundle-1.6.0.jar
-parquet-jackson-1.8.1.jar
+parquet-jackson-1.7.0.jar
 pmml-model-1.2.15.jar
 pmml-schema-1.2.15.jar
 protobuf-java-2.5.0.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/8cdc0d4d/dev/deps/spark-deps-hadoop-2.6
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index b3dced6..9afe50f 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -144,13 +144,14 @@ opencsv-2.3.jar
 oro-2.0.8.jar
 osgi-resource-locator-1.0.1.jar
 paranamer-2.3.jar
-parquet-column-1.8.1.jar
-parquet-common-1.8.1.jar
-parquet-encoding-1.8.1.jar
+parquet-column-1.7.0.jar
+parquet-common-1.7.0.jar
+parquet-encoding-1.7.0.jar
 parquet-format-2.3.0-incubating.jar
-parquet-hadoop-1.8.1.jar
+parquet-generator-1.7.0.jar
+parquet-hadoop-1.7.0.jar
 parquet-hadoop-bundle-1.6.0.jar
-parquet-jackson-1.8.1.jar
+parquet-jackson-1.7.0.jar
 pmml-model-1.2.15.jar
 pmml-schema-1.2.15.jar
 protobuf-java-2.5.0.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/8cdc0d4d/dev/deps/spark-deps-hadoop-2.7
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7
index 16f60f2..879157a 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/deps/spark-deps-hadoop-2.7
@@ -145,13 +145,14 @@ opencsv-2.3.jar
 oro-2.0.8.jar
 osgi-resource-locator-1.0.1.jar
 paranamer-2.3.jar
-parquet-column-1.8.1.jar
-parquet-common-1.8.1.jar
-parquet-encoding-1.8.1.jar
+parquet-column-1.7.0.jar
+parquet-common-1.7.0.jar
+parquet-encoding-1.7.0.jar
 parquet-format-2.3.0-incubating.jar
-parquet-hadoop-1.8.1.jar
+parquet-generator-1.7.0.jar
+parquet-hadoop-1.7.0.jar
 parquet-hadoop-bundle-1.6.0.jar
-parquet-jackson-1.8.1.jar
+parquet-jackson-1.7.0.jar
 pmml-model-1.2.15.jar
 pmml-schema-1.2.15.jar
 protobuf-java-2.5.0.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/8cdc0d4d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 60c8c8d..79ee787 100644
--- a/pom.xml
+++ b/pom.xml
@@ -133,7 +133,7 @@
     <!-- Version used for internal directory structure -->
     <hive.version.short>1.2.1</hive.version.short>
     <derby.version>10.11.1.1</derby.version>
-    <parquet.version>1.8.1</parquet.version>
+    <parquet.version>1.7.0</parquet.version>
     <hive.parquet.version>1.6.0</hive.parquet.version>
     <jetty.version>9.2.16.v20160414</jetty.version>
     <javaxservlet.version>3.1.0</javaxservlet.version>

http://git-wip-us.apache.org/repos/asf/spark/blob/8cdc0d4d/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index 3f7a872..cbe8f78 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -58,8 +58,6 @@ import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.hadoop.util.ConfigurationUtil;
 import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.PrimitiveType;
-import org.apache.parquet.schema.Type;
 import org.apache.parquet.schema.Types;
 import org.apache.spark.sql.types.StructType;
 
@@ -188,19 +186,15 @@ public abstract class SpecificParquetRecordReaderBase<T> 
extends RecordReader<Vo
     if (columns == null) {
       this.requestedSchema = fileSchema;
     } else {
-      if (columns.size() > 0) {
-        Types.MessageTypeBuilder builder = Types.buildMessage();
-        for (String s: columns) {
-          if (!fileSchema.containsField(s)) {
-            throw new IOException("Can only project existing columns. Unknown 
field: " + s +
-                    " File schema:\n" + fileSchema);
-          }
-          builder.addFields(fileSchema.getType(s));
+      Types.MessageTypeBuilder builder = Types.buildMessage();
+      for (String s: columns) {
+        if (!fileSchema.containsField(s)) {
+          throw new IOException("Can only project existing columns. Unknown 
field: " + s +
+            " File schema:\n" + fileSchema);
         }
-        this.requestedSchema = 
builder.named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME());
-      } else {
-        this.requestedSchema = CatalystSchemaConverter.EMPTY_MESSAGE();
+        builder.addFields(fileSchema.getType(s));
       }
+      this.requestedSchema = builder.named("spark_schema");
     }
     this.sparkSchema = new 
CatalystSchemaConverter(config).convert(requestedSchema);
     this.reader = new ParquetFileReader(config, file, blocks, 
requestedSchema.getColumns());

http://git-wip-us.apache.org/repos/asf/spark/blob/8cdc0d4d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
index 9c885b2..850e807 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
@@ -109,14 +109,10 @@ private[parquet] object CatalystReadSupport {
    */
   def clipParquetSchema(parquetSchema: MessageType, catalystSchema: 
StructType): MessageType = {
     val clippedParquetFields = 
clipParquetGroupFields(parquetSchema.asGroupType(), catalystSchema)
-    if (clippedParquetFields.isEmpty) {
-      CatalystSchemaConverter.EMPTY_MESSAGE
-    } else {
-      Types
-        .buildMessage()
-        .addFields(clippedParquetFields: _*)
-        .named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
-    }
+    Types
+      .buildMessage()
+      .addFields(clippedParquetFields: _*)
+      .named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
   }
 
   private def clipParquetType(parquetType: Type, catalystType: DataType): Type 
= {

http://git-wip-us.apache.org/repos/asf/spark/blob/8cdc0d4d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
index 3688c3e..6f6340f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
@@ -538,22 +538,6 @@ private[parquet] class CatalystSchemaConverter(
 private[parquet] object CatalystSchemaConverter {
   val SPARK_PARQUET_SCHEMA_NAME = "spark_schema"
 
-  // !! HACK ALERT !!
-  //
-  // PARQUET-363 & PARQUET-278: parquet-mr 1.8.1 doesn't allow constructing 
empty GroupType,
-  // which prevents us to avoid selecting any columns for queries like `SELECT 
COUNT(*) FROM t`.
-  // This issue has been fixed in parquet-mr 1.8.2-SNAPSHOT.
-  //
-  // To workaround this problem, here we first construct a `MessageType` with 
a single dummy
-  // field, and then remove the field to obtain an empty `MessageType`.
-  //
-  // TODO Reverts this change after upgrading parquet-mr to 1.8.2+
-  val EMPTY_MESSAGE = Types
-      .buildMessage()
-      .required(PrimitiveType.PrimitiveTypeName.INT32).named("dummy")
-      .named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
-  EMPTY_MESSAGE.getFields.clear()
-
   def checkFieldName(name: String): Unit = {
     // ,;{}()\n\t= and space are special characters in Parquet schema
     checkConversionRequirement(

http://git-wip-us.apache.org/repos/asf/spark/blob/8cdc0d4d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 6240812..95afdc7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -22,6 +22,8 @@ import java.io.Serializable
 import org.apache.parquet.filter2.predicate._
 import org.apache.parquet.filter2.predicate.FilterApi._
 import org.apache.parquet.io.api.Binary
+import org.apache.parquet.schema.OriginalType
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
 
 import org.apache.spark.sql.sources
 import org.apache.spark.sql.types._
@@ -51,15 +53,18 @@ private[sql] object ParquetFilters {
     case DoubleType =>
       (n: String, v: Any) => FilterApi.eq(doubleColumn(n), 
v.asInstanceOf[java.lang.Double])
 
+    // See https://issues.apache.org/jira/browse/SPARK-11153
+    /*
     // Binary.fromString and Binary.fromByteArray don't accept null values
     case StringType =>
       (n: String, v: Any) => FilterApi.eq(
         binaryColumn(n),
-        Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull)
+        Option(v).map(s => 
Binary.fromByteArray(s.asInstanceOf[String].getBytes("utf-8"))).orNull)
     case BinaryType =>
       (n: String, v: Any) => FilterApi.eq(
         binaryColumn(n),
-        Option(v).map(b => 
Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
+        Option(v).map(b => 
Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull)
+     */
   }
 
   private val makeNotEq: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
@@ -74,14 +79,17 @@ private[sql] object ParquetFilters {
     case DoubleType =>
       (n: String, v: Any) => FilterApi.notEq(doubleColumn(n), 
v.asInstanceOf[java.lang.Double])
 
+    // See https://issues.apache.org/jira/browse/SPARK-11153
+    /*
     case StringType =>
       (n: String, v: Any) => FilterApi.notEq(
         binaryColumn(n),
-        Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull)
+        Option(v).map(s => 
Binary.fromByteArray(s.asInstanceOf[String].getBytes("utf-8"))).orNull)
     case BinaryType =>
       (n: String, v: Any) => FilterApi.notEq(
         binaryColumn(n),
-        Option(v).map(b => 
Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
+        Option(v).map(b => 
Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull)
+     */
   }
 
   private val makeLt: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
@@ -94,13 +102,16 @@ private[sql] object ParquetFilters {
     case DoubleType =>
       (n: String, v: Any) => FilterApi.lt(doubleColumn(n), 
v.asInstanceOf[java.lang.Double])
 
+    // See https://issues.apache.org/jira/browse/SPARK-11153
+    /*
     case StringType =>
       (n: String, v: Any) =>
         FilterApi.lt(binaryColumn(n),
-          Binary.fromString(v.asInstanceOf[String]))
+          Binary.fromByteArray(v.asInstanceOf[String].getBytes("utf-8")))
     case BinaryType =>
       (n: String, v: Any) =>
-        FilterApi.lt(binaryColumn(n), 
Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
+        FilterApi.lt(binaryColumn(n), 
Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
+     */
   }
 
   private val makeLtEq: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
@@ -113,13 +124,16 @@ private[sql] object ParquetFilters {
     case DoubleType =>
       (n: String, v: Any) => FilterApi.ltEq(doubleColumn(n), 
v.asInstanceOf[java.lang.Double])
 
+    // See https://issues.apache.org/jira/browse/SPARK-11153
+    /*
     case StringType =>
       (n: String, v: Any) =>
         FilterApi.ltEq(binaryColumn(n),
-          Binary.fromString(v.asInstanceOf[String]))
+          Binary.fromByteArray(v.asInstanceOf[String].getBytes("utf-8")))
     case BinaryType =>
       (n: String, v: Any) =>
-        FilterApi.ltEq(binaryColumn(n), 
Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
+        FilterApi.ltEq(binaryColumn(n), 
Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
+     */
   }
 
   private val makeGt: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
@@ -133,13 +147,15 @@ private[sql] object ParquetFilters {
       (n: String, v: Any) => FilterApi.gt(doubleColumn(n), 
v.asInstanceOf[java.lang.Double])
 
     // See https://issues.apache.org/jira/browse/SPARK-11153
+    /*
     case StringType =>
       (n: String, v: Any) =>
         FilterApi.gt(binaryColumn(n),
-          Binary.fromString(v.asInstanceOf[String]))
+          Binary.fromByteArray(v.asInstanceOf[String].getBytes("utf-8")))
     case BinaryType =>
       (n: String, v: Any) =>
-        FilterApi.gt(binaryColumn(n), 
Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
+        FilterApi.gt(binaryColumn(n), 
Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
+     */
   }
 
   private val makeGtEq: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
@@ -152,13 +168,16 @@ private[sql] object ParquetFilters {
     case DoubleType =>
       (n: String, v: Any) => FilterApi.gtEq(doubleColumn(n), 
v.asInstanceOf[java.lang.Double])
 
+    // See https://issues.apache.org/jira/browse/SPARK-11153
+    /*
     case StringType =>
       (n: String, v: Any) =>
         FilterApi.gtEq(binaryColumn(n),
-          Binary.fromString(v.asInstanceOf[String]))
+          Binary.fromByteArray(v.asInstanceOf[String].getBytes("utf-8")))
     case BinaryType =>
       (n: String, v: Any) =>
-        FilterApi.gtEq(binaryColumn(n), 
Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
+        FilterApi.gtEq(binaryColumn(n), 
Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
+     */
   }
 
   private val makeInSet: PartialFunction[DataType, (String, Set[Any]) => 
FilterPredicate] = {
@@ -175,14 +194,17 @@ private[sql] object ParquetFilters {
       (n: String, v: Set[Any]) =>
         FilterApi.userDefined(doubleColumn(n), 
SetInFilter(v.asInstanceOf[Set[java.lang.Double]]))
 
+    // See https://issues.apache.org/jira/browse/SPARK-11153
+    /*
     case StringType =>
       (n: String, v: Set[Any]) =>
         FilterApi.userDefined(binaryColumn(n),
-          SetInFilter(v.map(s => Binary.fromString(s.asInstanceOf[String]))))
+          SetInFilter(v.map(s => 
Binary.fromByteArray(s.asInstanceOf[String].getBytes("utf-8")))))
     case BinaryType =>
       (n: String, v: Set[Any]) =>
         FilterApi.userDefined(binaryColumn(n),
-          SetInFilter(v.map(e => 
Binary.fromReusedByteArray(e.asInstanceOf[Array[Byte]]))))
+          SetInFilter(v.map(e => 
Binary.fromByteArray(e.asInstanceOf[Array[Byte]]))))
+     */
   }
 
   /**
@@ -206,6 +228,8 @@ private[sql] object ParquetFilters {
   def createFilter(schema: StructType, predicate: sources.Filter): 
Option[FilterPredicate] = {
     val dataTypeOf = getFieldMap(schema).toMap
 
+    relaxParquetValidTypeMap
+
     // NOTE:
     //
     // For any comparison operator `cmp`, both `a cmp NULL` and `NULL cmp a` 
evaluate to `NULL`,
@@ -275,4 +299,35 @@ private[sql] object ParquetFilters {
       case _ => None
     }
   }
+
+  // !! HACK ALERT !!
+  //
+  // This lazy val is a workaround for PARQUET-201, and should be removed once 
we upgrade to
+  // parquet-mr 1.8.1 or higher versions.
+  //
+  // In Parquet, not all types of columns can be used for filter push-down 
optimization.  The set
+  // of valid column types is controlled by `ValidTypeMap`.  Unfortunately, in 
parquet-mr 1.7.0 and
+  // prior versions, the limitation is too strict, and doesn't allow `BINARY 
(ENUM)` columns to be
+  // pushed down.
+  //
+  // This restriction is problematic for Spark SQL, because Spark SQL doesn't 
have a type that maps
+  // to Parquet original type `ENUM` directly, and always converts `ENUM` to 
`StringType`.  Thus,
+  // a predicate involving a `ENUM` field can be pushed-down as a string 
column, which is perfectly
+  // legal except that it fails the `ValidTypeMap` check.
+  //
+  // Here we add `BINARY (ENUM)` into `ValidTypeMap` lazily via reflection to 
workaround this issue.
+  private lazy val relaxParquetValidTypeMap: Unit = {
+    val constructor = Class
+      .forName(classOf[ValidTypeMap].getCanonicalName + "$FullTypeDescriptor")
+      .getDeclaredConstructor(classOf[PrimitiveTypeName], 
classOf[OriginalType])
+
+    constructor.setAccessible(true)
+    val enumTypeDescriptor = constructor
+      .newInstance(PrimitiveTypeName.BINARY, OriginalType.ENUM)
+      .asInstanceOf[AnyRef]
+
+    val addMethod = classOf[ValidTypeMap].getDeclaredMethods.find(_.getName == 
"add").get
+    addMethod.setAccessible(true)
+    addMethod.invoke(null, classOf[Binary], enumTypeDescriptor)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8cdc0d4d/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index 0b5038c..6db6492 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet
 import scala.reflect.ClassTag
 import scala.reflect.runtime.universe.TypeTag
 
-import org.apache.parquet.schema.{MessageType, MessageTypeParser}
+import org.apache.parquet.schema.MessageTypeParser
 
 import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.ScalaReflection
@@ -1065,26 +1065,18 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
       parquetSchema: String,
       catalystSchema: StructType,
       expectedSchema: String): Unit = {
-    testSchemaClipping(testName, parquetSchema, catalystSchema,
-      MessageTypeParser.parseMessageType(expectedSchema))
-  }
-
-  private def testSchemaClipping(
-      testName: String,
-      parquetSchema: String,
-      catalystSchema: StructType,
-      expectedSchema: MessageType): Unit = {
     test(s"Clipping - $testName") {
+      val expected = MessageTypeParser.parseMessageType(expectedSchema)
       val actual = CatalystReadSupport.clipParquetSchema(
         MessageTypeParser.parseMessageType(parquetSchema), catalystSchema)
 
       try {
-        expectedSchema.checkContains(actual)
-        actual.checkContains(expectedSchema)
+        expected.checkContains(actual)
+        actual.checkContains(expected)
       } catch { case cause: Throwable =>
         fail(
           s"""Expected clipped schema:
-             |$expectedSchema
+             |$expected
              |Actual clipped schema:
              |$actual
            """.stripMargin,
@@ -1437,7 +1429,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
 
     catalystSchema = new StructType(),
 
-    expectedSchema = CatalystSchemaConverter.EMPTY_MESSAGE)
+    expectedSchema = "message root {}")
 
   testSchemaClipping(
     "disjoint field sets",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to