Repository: spark
Updated Branches:
  refs/heads/master cd6dff78b -> f2d35427e


[SPARK-4502][SQL] Parquet nested column pruning - foundation

(Link to Jira: https://issues.apache.org/jira/browse/SPARK-4502)

_N.B. This is a restart of PR #16578 which includes a subset of that code. 
Relevant review comments from that PR should be considered incorporated by 
reference. Please avoid duplication in review by reviewing that PR first. The 
summary below is an edited copy of the summary of the previous PR._

## What changes were proposed in this pull request?

One of the hallmarks of a column-oriented data storage format is the ability to 
read data from a subset of columns, efficiently skipping reads from other 
columns. Spark has long had support for pruning unneeded top-level schema 
fields from the scan of a parquet file. For example, consider a table, 
`contacts`, backed by parquet with the following Spark SQL schema:

```
root
 |-- name: struct
 |    |-- first: string
 |    |-- last: string
 |-- address: string
```

Parquet stores this table's data in three physical columns: `name.first`, 
`name.last` and `address`. To answer the query

```SQL
select address from contacts
```

Spark will read only from the `address` column of parquet data. However, to 
answer the query

```SQL
select name.first from contacts
```

Spark will read `name.first` and `name.last` from parquet.

This PR modifies Spark SQL to support a finer-grain of schema pruning. With 
this patch, Spark reads only the `name.first` column to answer the previous 
query.

### Implementation

There are two main components of this patch. First, there is a 
`ParquetSchemaPruning` optimizer rule for gathering the required schema fields 
of a `PhysicalOperation` over a parquet file, constructing a new schema based 
on those required fields and rewriting the plan in terms of that pruned schema. 
The pruned schema fields are pushed down to the parquet requested read schema. 
`ParquetSchemaPruning` uses a new `ProjectionOverSchema` extractor for 
rewriting a catalyst expression in terms of a pruned schema.

Second, the `ParquetRowConverter` has been patched to ensure the ordinals of 
the parquet columns read are correct for the pruned schema. 
`ParquetReadSupport` has been patched to address a compatibility mismatch 
between Spark's built in vectorized reader and the parquet-mr library's reader.

### Limitation

Among the complex Spark SQL data types, this patch supports parquet column 
pruning of nested sequences of struct fields only.

## How was this patch tested?

Care has been taken to ensure correctness and prevent regressions. A more 
advanced version of this patch incorporating optimizations for rewriting 
queries involving aggregations and joins has been running on a production Spark 
cluster at VideoAmp for several years. In that time, one bug was found and 
fixed early on, and we added a regression test for that bug.

We forward-ported this patch to Spark master in June 2016 and have been running 
this patch against Spark 2.x branches on ad-hoc clusters since then.

Closes #21320 from mallman/spark-4502-parquet_column_pruning-foundation.

Lead-authored-by: Michael Allman <m...@allman.ms>
Co-authored-by: Adam Jacques <a...@technowizardry.net>
Co-authored-by: Michael Allman <mich...@videoamp.com>
Signed-off-by: Xiao Li <gatorsm...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f2d35427
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f2d35427
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f2d35427

Branch: refs/heads/master
Commit: f2d35427eedeacceb6edb8a51974a7e8bbb94bc2
Parents: cd6dff7
Author: Michael Allman <m...@allman.ms>
Authored: Thu Aug 23 21:31:10 2018 -0700
Committer: Xiao Li <gatorsm...@gmail.com>
Committed: Thu Aug 23 21:31:10 2018 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/internal/SQLConf.scala |  14 +-
 .../spark/sql/catalyst/SchemaPruningTest.scala  |  45 ++
 .../sql/execution/GetStructFieldObject.scala    |  33 ++
 .../sql/execution/ProjectionOverSchema.scala    |  59 +++
 .../spark/sql/execution/SelectedField.scala     | 134 ++++++
 .../spark/sql/execution/SparkOptimizer.scala    |   4 +-
 .../parquet/ParquetSchemaPruning.scala          | 257 +++++++++++
 .../sql/execution/SelectedFieldSuite.scala      | 455 +++++++++++++++++++
 .../datasources/parquet/ParquetQuerySuite.scala |   6 +-
 .../parquet/ParquetSchemaPruningSuite.scala     | 311 +++++++++++++
 10 files changed, 1313 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f2d35427/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index df2caff..ef3ce98 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1419,8 +1419,18 @@ object SQLConf {
         "issues. Turn on this config to insert a local sort before actually 
doing repartition " +
         "to generate consistent repartition results. The performance of 
repartition() may go " +
         "down since we insert extra local sort before it.")
+        .booleanConf
+        .createWithDefault(true)
+
+  val NESTED_SCHEMA_PRUNING_ENABLED =
+    buildConf("spark.sql.nestedSchemaPruning.enabled")
+      .internal()
+      .doc("Prune nested fields from a logical relation's output which are 
unnecessary in " +
+        "satisfying a query. This optimization allows columnar file format 
readers to avoid " +
+        "reading unnecessary nested column data. Currently Parquet is the only 
data source that " +
+        "implements this optimization.")
       .booleanConf
-      .createWithDefault(true)
+      .createWithDefault(false)
 
   val TOP_K_SORT_FALLBACK_THRESHOLD =
     buildConf("spark.sql.execution.topKSortFallbackThreshold")
@@ -1895,6 +1905,8 @@ class SQLConf extends Serializable with Logging {
   def partitionOverwriteMode: PartitionOverwriteMode.Value =
     PartitionOverwriteMode.withName(getConf(PARTITION_OVERWRITE_MODE))
 
+  def nestedSchemaPruningEnabled: Boolean = 
getConf(NESTED_SCHEMA_PRUNING_ENABLED)
+
   def csvColumnPruning: Boolean = getConf(SQLConf.CSV_PARSER_COLUMN_PRUNING)
 
   def legacySizeOfNull: Boolean = getConf(SQLConf.LEGACY_SIZE_OF_NULL)

http://git-wip-us.apache.org/repos/asf/spark/blob/f2d35427/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/SchemaPruningTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/SchemaPruningTest.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/SchemaPruningTest.scala
new file mode 100644
index 0000000..68e76fc
--- /dev/null
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/SchemaPruningTest.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst
+
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.internal.SQLConf.NESTED_SCHEMA_PRUNING_ENABLED
+
+/**
+ * A PlanTest that ensures that all tests in this suite are run with nested 
schema pruning enabled.
+ * Remove this trait once the default value of 
SQLConf.NESTED_SCHEMA_PRUNING_ENABLED is set to true.
+ */
+private[sql] trait SchemaPruningTest extends PlanTest with BeforeAndAfterAll {
+  private var originalConfSchemaPruningEnabled = false
+
+  override protected def beforeAll(): Unit = {
+    originalConfSchemaPruningEnabled = conf.nestedSchemaPruningEnabled
+    conf.setConf(NESTED_SCHEMA_PRUNING_ENABLED, true)
+    super.beforeAll()
+  }
+
+  override protected def afterAll(): Unit = {
+    try {
+      super.afterAll()
+    } finally {
+      conf.setConf(NESTED_SCHEMA_PRUNING_ENABLED, 
originalConfSchemaPruningEnabled)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f2d35427/sql/core/src/main/scala/org/apache/spark/sql/execution/GetStructFieldObject.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/GetStructFieldObject.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/GetStructFieldObject.scala
new file mode 100644
index 0000000..c88b2f8
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/GetStructFieldObject.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.catalyst.expressions.{Expression, GetStructField}
+import org.apache.spark.sql.types.StructField
+
+/**
+ * A Scala extractor that extracts the child expression and struct field from 
a [[GetStructField]].
+ * This is in contrast to the [[GetStructField]] case class extractor which 
returns the field
+ * ordinal instead of the field itself.
+ */
+private[execution] object GetStructFieldObject {
+  def unapply(getStructField: GetStructField): Option[(Expression, 
StructField)] =
+    Some((
+      getStructField.child,
+      getStructField.childSchema(getStructField.ordinal)))
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f2d35427/sql/core/src/main/scala/org/apache/spark/sql/execution/ProjectionOverSchema.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ProjectionOverSchema.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ProjectionOverSchema.scala
new file mode 100644
index 0000000..2236f18
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ProjectionOverSchema.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types._
+
+/**
+ * A Scala extractor that projects an expression over a given schema. Data 
types,
+ * field indexes and field counts of complex type extractors and attributes
+ * are adjusted to fit the schema. All other expressions are left as-is. This
+ * class is motivated by columnar nested schema pruning.
+ */
+private[execution] case class ProjectionOverSchema(schema: StructType) {
+  private val fieldNames = schema.fieldNames.toSet
+
+  def unapply(expr: Expression): Option[Expression] = getProjection(expr)
+
+  private def getProjection(expr: Expression): Option[Expression] =
+    expr match {
+      case a: AttributeReference if fieldNames.contains(a.name) =>
+        Some(a.copy(dataType = schema(a.name).dataType)(a.exprId, a.qualifier))
+      case GetArrayItem(child, arrayItemOrdinal) =>
+        getProjection(child).map { projection => GetArrayItem(projection, 
arrayItemOrdinal) }
+      case a: GetArrayStructFields =>
+        getProjection(a.child).map(p => (p, p.dataType)).map {
+          case (projection, ArrayType(projSchema @ StructType(_), _)) =>
+            GetArrayStructFields(projection,
+              projSchema(a.field.name),
+              projSchema.fieldIndex(a.field.name),
+              projSchema.size,
+              a.containsNull)
+        }
+      case GetMapValue(child, key) =>
+        getProjection(child).map { projection => GetMapValue(projection, key) }
+      case GetStructFieldObject(child, field: StructField) =>
+        getProjection(child).map(p => (p, p.dataType)).map {
+          case (projection, projSchema: StructType) =>
+            GetStructField(projection, projSchema.fieldIndex(field.name))
+        }
+      case _ =>
+        None
+    }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f2d35427/sql/core/src/main/scala/org/apache/spark/sql/execution/SelectedField.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SelectedField.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SelectedField.scala
new file mode 100644
index 0000000..0e7c593
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SelectedField.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types._
+
+/**
+ * A Scala extractor that builds a [[org.apache.spark.sql.types.StructField]] 
from a Catalyst
+ * complex type extractor. For example, consider a relation with the following 
schema:
+ *
+ * {{{
+ * root
+ * |-- name: struct (nullable = true)
+ * |    |-- first: string (nullable = true)
+ * |    |-- last: string (nullable = true)
+ * }}}
+ *
+ * Further, suppose we take the select expression `name.first`. This will 
parse into an
+ * `Alias(child, "first")`. Ignoring the alias, `child` matches the following 
pattern:
+ *
+ * {{{
+ * GetStructFieldObject(
+ *   AttributeReference("name", StructType(_), _, _),
+ *   StructField("first", StringType, _, _))
+ * }}}
+ *
+ * [[SelectedField]] converts that expression into
+ *
+ * {{{
+ * StructField("name", StructType(Array(StructField("first", StringType))))
+ * }}}
+ *
+ * by mapping each complex type extractor to a 
[[org.apache.spark.sql.types.StructField]] with the
+ * same name as its child (or "parent" going right to left in the select 
expression) and a data
+ * type appropriate to the complex type extractor. In our example, the name of 
the child expression
+ * is "name" and its data type is a [[org.apache.spark.sql.types.StructType]] 
with a single string
+ * field named "first".
+ *
+ * @param expr the top-level complex type extractor
+ */
+private[execution] object SelectedField {
+  def unapply(expr: Expression): Option[StructField] = {
+    // If this expression is an alias, work on its child instead
+    val unaliased = expr match {
+      case Alias(child, _) => child
+      case expr => expr
+    }
+    selectField(unaliased, None)
+  }
+
+  private def selectField(expr: Expression, fieldOpt: Option[StructField]): 
Option[StructField] = {
+    expr match {
+      // No children. Returns a StructField with the attribute name or None if 
fieldOpt is None.
+      case AttributeReference(name, dataType, nullable, metadata) =>
+        fieldOpt.map(field =>
+          StructField(name, wrapStructType(dataType, field), nullable, 
metadata))
+      // Handles case "expr0.field[n]", where "expr0" is of struct type and 
"expr0.field" is of
+      // array type.
+      case GetArrayItem(x @ GetStructFieldObject(child, field @ 
StructField(name,
+          dataType, nullable, metadata)), _) =>
+        val childField = fieldOpt.map(field => StructField(name,
+          wrapStructType(dataType, field), nullable, 
metadata)).getOrElse(field)
+        selectField(child, Some(childField))
+      // Handles case "expr0.field[n]", where "expr0.field" is of array type.
+      case GetArrayItem(child, _) =>
+        selectField(child, fieldOpt)
+      // Handles case "expr0.field.subfield", where "expr0" and "expr0.field" 
are of array type.
+      case GetArrayStructFields(child: GetArrayStructFields,
+          field @ StructField(name, dataType, nullable, metadata), _, _, _) =>
+        val childField = fieldOpt.map(field => StructField(name,
+            wrapStructType(dataType, field),
+            nullable, metadata)).orElse(Some(field))
+        selectField(child, childField)
+      // Handles case "expr0.field", where "expr0" is of array type.
+      case GetArrayStructFields(child,
+          field @ StructField(name, dataType, nullable, metadata), _, _, _) =>
+        val childField =
+          fieldOpt.map(field => StructField(name,
+            wrapStructType(dataType, field),
+            nullable, metadata)).orElse(Some(field))
+        selectField(child, childField)
+      // Handles case "expr0.field[key]", where "expr0" is of struct type and 
"expr0.field" is of
+      // map type.
+      case GetMapValue(x @ GetStructFieldObject(child, field @ 
StructField(name,
+          dataType,
+          nullable, metadata)), _) =>
+        val childField = fieldOpt.map(field => StructField(name,
+          wrapStructType(dataType, field),
+          nullable, metadata)).orElse(Some(field))
+        selectField(child, childField)
+      // Handles case "expr0.field[key]", where "expr0.field" is of map type.
+      case GetMapValue(child, _) =>
+        selectField(child, fieldOpt)
+      // Handles case "expr0.field", where expr0 is of struct type.
+      case GetStructFieldObject(child,
+        field @ StructField(name, dataType, nullable, metadata)) =>
+        val childField = fieldOpt.map(field => StructField(name,
+          wrapStructType(dataType, field),
+          nullable, metadata)).orElse(Some(field))
+        selectField(child, childField)
+      case _ =>
+        None
+    }
+  }
+
+  // Constructs a composition of complex types with a StructType(Array(field)) 
at its core. Returns
+  // a StructType for a StructType, an ArrayType for an ArrayType and a 
MapType for a MapType.
+  private def wrapStructType(dataType: DataType, field: StructField): DataType 
= {
+    dataType match {
+      case _: StructType =>
+        StructType(Array(field))
+      case ArrayType(elementType, containsNull) =>
+        ArrayType(wrapStructType(elementType, field), containsNull)
+      case MapType(keyType, valueType, valueContainsNull) =>
+        MapType(keyType, wrapStructType(valueType, field), valueContainsNull)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f2d35427/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
index 64d3f2c..969def7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
@@ -21,6 +21,7 @@ import org.apache.spark.sql.ExperimentalMethods
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog
 import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions
+import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaPruning
 import org.apache.spark.sql.execution.python.ExtractPythonUDFFromAggregate
 
 class SparkOptimizer(
@@ -31,7 +32,8 @@ class SparkOptimizer(
   override def defaultBatches: Seq[Batch] = (preOptimizationBatches ++ 
super.defaultBatches :+
     Batch("Optimize Metadata Only Query", Once, 
OptimizeMetadataOnlyQuery(catalog)) :+
     Batch("Extract Python UDF from Aggregate", Once, 
ExtractPythonUDFFromAggregate) :+
-    Batch("Prune File Source Table Partitions", Once, 
PruneFileSourcePartitions)) ++
+    Batch("Prune File Source Table Partitions", Once, 
PruneFileSourcePartitions) :+
+    Batch("Parquet Schema Pruning", Once, ParquetSchemaPruning)) ++
     postHocOptimizationBatches :+
     Batch("User Provided Optimizers", fixedPoint, 
experimentalMethods.extraOptimizations: _*)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f2d35427/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
new file mode 100644
index 0000000..6a46b5f
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
AttributeReference, Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{ProjectionOverSchema, SelectedField}
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, 
StructType}
+
+/**
+ * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a
+ * [[ParquetRelation]]. By "Parquet column", we mean a column as defined in the
+ * Parquet format. In Spark SQL, a root-level Parquet column corresponds to a
+ * SQL column, and a nested Parquet column corresponds to a [[StructField]].
+ */
+private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+    if (SQLConf.get.nestedSchemaPruningEnabled) {
+      apply0(plan)
+    } else {
+      plan
+    }
+
+  private def apply0(plan: LogicalPlan): LogicalPlan =
+    plan transformDown {
+      case op @ PhysicalOperation(projects, filters,
+          l @ LogicalRelation(hadoopFsRelation: HadoopFsRelation, _, _, _))
+        if canPruneRelation(hadoopFsRelation) =>
+        val (normalizedProjects, normalizedFilters) =
+          normalizeAttributeRefNames(l, projects, filters)
+        val requestedRootFields = identifyRootFields(normalizedProjects, 
normalizedFilters)
+
+        // If requestedRootFields includes a nested field, continue. Otherwise,
+        // return op
+        if (requestedRootFields.exists { root: RootField => 
!root.derivedFromAtt }) {
+          val dataSchema = hadoopFsRelation.dataSchema
+          val prunedDataSchema = pruneDataSchema(dataSchema, 
requestedRootFields)
+
+          // If the data schema is different from the pruned data schema, 
continue. Otherwise,
+          // return op. We effect this comparison by counting the number of 
"leaf" fields in
+          // each schemata, assuming the fields in prunedDataSchema are a 
subset of the fields
+          // in dataSchema.
+          if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) {
+            val prunedParquetRelation =
+              hadoopFsRelation.copy(dataSchema = 
prunedDataSchema)(hadoopFsRelation.sparkSession)
+
+            val prunedRelation = buildPrunedRelation(l, prunedParquetRelation)
+            val projectionOverSchema = ProjectionOverSchema(prunedDataSchema)
+
+            buildNewProjection(normalizedProjects, normalizedFilters, 
prunedRelation,
+              projectionOverSchema)
+          } else {
+            op
+          }
+        } else {
+          op
+        }
+    }
+
+  /**
+   * Checks to see if the given relation is Parquet and can be pruned.
+   */
+  private def canPruneRelation(fsRelation: HadoopFsRelation) =
+    fsRelation.fileFormat.isInstanceOf[ParquetFileFormat]
+
+  /**
+   * Normalizes the names of the attribute references in the given projects 
and filters to reflect
+   * the names in the given logical relation. This makes it possible to 
compare attributes and
+   * fields by name. Returns a tuple with the normalized projects and filters, 
respectively.
+   */
+  private def normalizeAttributeRefNames(
+      logicalRelation: LogicalRelation,
+      projects: Seq[NamedExpression],
+      filters: Seq[Expression]): (Seq[NamedExpression], Seq[Expression]) = {
+    val normalizedAttNameMap = logicalRelation.output.map(att => (att.exprId, 
att.name)).toMap
+    val normalizedProjects = projects.map(_.transform {
+      case att: AttributeReference if 
normalizedAttNameMap.contains(att.exprId) =>
+        att.withName(normalizedAttNameMap(att.exprId))
+    }).map { case expr: NamedExpression => expr }
+    val normalizedFilters = filters.map(_.transform {
+      case att: AttributeReference if 
normalizedAttNameMap.contains(att.exprId) =>
+        att.withName(normalizedAttNameMap(att.exprId))
+    })
+    (normalizedProjects, normalizedFilters)
+  }
+
+  /**
+   * Returns the set of fields from the Parquet file that the query plan needs.
+   */
+  private def identifyRootFields(projects: Seq[NamedExpression], filters: 
Seq[Expression]) = {
+    val projectionRootFields = projects.flatMap(getRootFields)
+    val filterRootFields = filters.flatMap(getRootFields)
+
+    (projectionRootFields ++ filterRootFields).distinct
+  }
+
+  /**
+   * Builds the new output [[Project]] Spark SQL operator that has the pruned 
output relation.
+   */
+  private def buildNewProjection(
+      projects: Seq[NamedExpression], filters: Seq[Expression], 
prunedRelation: LogicalRelation,
+      projectionOverSchema: ProjectionOverSchema) = {
+    // Construct a new target for our projection by rewriting and
+    // including the original filters where available
+    val projectionChild =
+      if (filters.nonEmpty) {
+        val projectedFilters = filters.map(_.transformDown {
+          case projectionOverSchema(expr) => expr
+        })
+        val newFilterCondition = projectedFilters.reduce(And)
+        Filter(newFilterCondition, prunedRelation)
+      } else {
+        prunedRelation
+      }
+
+    // Construct the new projections of our Project by
+    // rewriting the original projections
+    val newProjects = projects.map(_.transformDown {
+      case projectionOverSchema(expr) => expr
+    }).map { case expr: NamedExpression => expr }
+
+    if (log.isDebugEnabled) {
+      logDebug(s"New 
projects:\n${newProjects.map(_.treeString).mkString("\n")}")
+    }
+
+    Project(newProjects, projectionChild)
+  }
+
+  /**
+   * Filters the schema from the given file by the requested fields.
+   * Schema field ordering from the file is preserved.
+   */
+  private def pruneDataSchema(
+      fileDataSchema: StructType,
+      requestedRootFields: Seq[RootField]) = {
+    // Merge the requested root fields into a single schema. Note the ordering 
of the fields
+    // in the resulting schema may differ from their ordering in the logical 
relation's
+    // original schema
+    val mergedSchema = requestedRootFields
+      .map { case RootField(field, _) => StructType(Array(field)) }
+      .reduceLeft(_ merge _)
+    val dataSchemaFieldNames = fileDataSchema.fieldNames.toSet
+    val mergedDataSchema =
+      StructType(mergedSchema.filter(f => 
dataSchemaFieldNames.contains(f.name)))
+    // Sort the fields of mergedDataSchema according to their order in 
dataSchema,
+    // recursively. This makes mergedDataSchema a pruned schema of dataSchema
+    sortLeftFieldsByRight(mergedDataSchema, 
fileDataSchema).asInstanceOf[StructType]
+  }
+
+  /**
+   * Builds a pruned logical relation from the output of the output relation 
and the schema of the
+   * pruned base relation.
+   */
+  private def buildPrunedRelation(
+      outputRelation: LogicalRelation,
+      prunedBaseRelation: HadoopFsRelation) = {
+    // We need to replace the expression ids of the pruned relation output 
attributes
+    // with the expression ids of the original relation output attributes so 
that
+    // references to the original relation's output are not broken
+    val outputIdMap = outputRelation.output.map(att => (att.name, 
att.exprId)).toMap
+    val prunedRelationOutput =
+      prunedBaseRelation
+        .schema
+        .toAttributes
+        .map {
+          case att if outputIdMap.contains(att.name) =>
+            att.withExprId(outputIdMap(att.name))
+          case att => att
+        }
+    outputRelation.copy(relation = prunedBaseRelation, output = 
prunedRelationOutput)
+  }
+
+  /**
+   * Gets the root (aka top-level, no-parent) [[StructField]]s for the given 
[[Expression]].
+   * When expr is an [[Attribute]], construct a field around it and indicate 
that that
+   * field was derived from an attribute.
+   */
+  private def getRootFields(expr: Expression): Seq[RootField] = {
+    expr match {
+      case att: Attribute =>
+        RootField(StructField(att.name, att.dataType, att.nullable), 
derivedFromAtt = true) :: Nil
+      case SelectedField(field) => RootField(field, derivedFromAtt = false) :: 
Nil
+      case _ =>
+        expr.children.flatMap(getRootFields)
+    }
+  }
+
+  /**
+   * Counts the "leaf" fields of the given dataType. Informally, this is the
+   * number of fields of non-complex data type in the tree representation of
+   * [[DataType]].
+   */
+  private def countLeaves(dataType: DataType): Int = {
+    dataType match {
+      case array: ArrayType => countLeaves(array.elementType)
+      case map: MapType => countLeaves(map.keyType) + 
countLeaves(map.valueType)
+      case struct: StructType =>
+        struct.map(field => countLeaves(field.dataType)).sum
+      case _ => 1
+    }
+  }
+
+  /**
+  * Sorts the fields and descendant fields of structs in left according to 
their order in
+  * right. This function assumes that the fields of left are a subset of the 
fields of
+  * right, recursively. That is, left is a "subschema" of right, ignoring 
order of
+  * fields.
+  */
+  private def sortLeftFieldsByRight(left: DataType, right: DataType): DataType 
=
+    (left, right) match {
+      case (ArrayType(leftElementType, containsNull), 
ArrayType(rightElementType, _)) =>
+        ArrayType(
+          sortLeftFieldsByRight(leftElementType, rightElementType),
+          containsNull)
+      case (MapType(leftKeyType, leftValueType, containsNull),
+          MapType(rightKeyType, rightValueType, _)) =>
+        MapType(
+          sortLeftFieldsByRight(leftKeyType, rightKeyType),
+          sortLeftFieldsByRight(leftValueType, rightValueType),
+          containsNull)
+      case (leftStruct: StructType, rightStruct: StructType) =>
+        val filteredRightFieldNames = 
rightStruct.fieldNames.filter(leftStruct.fieldNames.contains)
+        val sortedLeftFields = filteredRightFieldNames.map { fieldName =>
+          val leftFieldType = leftStruct(fieldName).dataType
+          val rightFieldType = rightStruct(fieldName).dataType
+          val sortedLeftFieldType = sortLeftFieldsByRight(leftFieldType, 
rightFieldType)
+          StructField(fieldName, sortedLeftFieldType)
+        }
+        StructType(sortedLeftFields)
+      case _ => left
+    }
+
+  /**
+   * A "root" schema field (aka top-level, no-parent) and whether it was 
derived from
+   * an attribute or had a proper child.
+   */
+  private case class RootField(field: StructField, derivedFromAtt: Boolean)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f2d35427/sql/core/src/test/scala/org/apache/spark/sql/execution/SelectedFieldSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SelectedFieldSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SelectedFieldSuite.scala
new file mode 100644
index 0000000..05f7e3c
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SelectedFieldSuite.scala
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.exceptions.TestFailedException
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.NamedExpression
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.types._
+
+class SelectedFieldSuite extends SparkFunSuite with BeforeAndAfterAll {
+  private val ignoredField = StructField("col1", StringType, nullable = false)
+
+  // The test schema as a tree string, i.e. `schema.treeString`
+  // root
+  //  |-- col1: string (nullable = false)
+  //  |-- col2: struct (nullable = true)
+  //  |    |-- field1: integer (nullable = true)
+  //  |    |-- field6: struct (nullable = true)
+  //  |    |    |-- subfield1: string (nullable = false)
+  //  |    |    |-- subfield2: string (nullable = true)
+  //  |    |-- field7: struct (nullable = true)
+  //  |    |    |-- subfield1: struct (nullable = true)
+  //  |    |    |    |-- subsubfield1: integer (nullable = true)
+  //  |    |    |    |-- subsubfield2: integer (nullable = true)
+  //  |    |-- field9: map (nullable = true)
+  //  |    |    |-- key: string
+  //  |    |    |-- value: integer (valueContainsNull = false)
+  private val nestedComplex = StructType(ignoredField ::
+      StructField("col2", StructType(
+        StructField("field1", IntegerType) ::
+        StructField("field6", StructType(
+          StructField("subfield1", StringType, nullable = false) ::
+          StructField("subfield2", StringType) :: Nil)) ::
+        StructField("field7", StructType(
+          StructField("subfield1", StructType(
+            StructField("subsubfield1", IntegerType) ::
+            StructField("subsubfield2", IntegerType) :: Nil)) :: Nil)) ::
+        StructField("field9",
+          MapType(StringType, IntegerType, valueContainsNull = false)) :: 
Nil)) :: Nil)
+
+  test("SelectedField should not match an attribute reference") {
+    val testRelation = LocalRelation(nestedComplex.toAttributes)
+    assertResult(None)(unapplySelect("col1", testRelation))
+    assertResult(None)(unapplySelect("col1 as foo", testRelation))
+    assertResult(None)(unapplySelect("col2", testRelation))
+  }
+
+  //  |-- col1: string (nullable = false)
+  //  |-- col2: struct (nullable = true)
+  //  |    |-- field2: array (nullable = true)
+  //  |    |    |-- element: integer (containsNull = false)
+  //  |    |-- field3: array (nullable = false)
+  //  |    |    |-- element: struct (containsNull = true)
+  //  |    |    |    |-- subfield1: integer (nullable = true)
+  //  |    |    |    |-- subfield2: integer (nullable = true)
+  //  |    |    |    |-- subfield3: array (nullable = true)
+  //  |    |    |    |    |-- element: integer (containsNull = true)
+  private val structOfArray = StructType(ignoredField ::
+    StructField("col2", StructType(
+      StructField("field2", ArrayType(IntegerType, containsNull = false)) ::
+      StructField("field3", ArrayType(StructType(
+        StructField("subfield1", IntegerType) ::
+          StructField("subfield2", IntegerType) ::
+          StructField("subfield3", ArrayType(IntegerType)) :: Nil)), nullable 
= false)
+      :: Nil))
+    :: Nil)
+
+  testSelect(structOfArray, "col2.field2", "col2.field2[0] as foo") {
+    StructField("col2", StructType(
+      StructField("field2", ArrayType(IntegerType, containsNull = false)) :: 
Nil))
+  }
+
+  testSelect(nestedComplex, "col2.field9", "col2.field9['foo'] as foo") {
+    StructField("col2", StructType(
+      StructField("field9", MapType(StringType, IntegerType, valueContainsNull 
= false)) :: Nil))
+  }
+
+  testSelect(structOfArray, "col2.field3.subfield3", "col2.field3[0].subfield3 
as foo",
+      "col2.field3.subfield3[0] as foo", "col2.field3[0].subfield3[0] as foo") 
{
+    StructField("col2", StructType(
+      StructField("field3", ArrayType(StructType(
+        StructField("subfield3", ArrayType(IntegerType)) :: Nil)), nullable = 
false) :: Nil))
+  }
+
+  testSelect(structOfArray, "col2.field3.subfield1") {
+    StructField("col2", StructType(
+      StructField("field3", ArrayType(StructType(
+        StructField("subfield1", IntegerType) :: Nil)), nullable = false) :: 
Nil))
+  }
+
+  //  |-- col1: string (nullable = false)
+  //  |-- col2: struct (nullable = true)
+  //  |    |-- field4: map (nullable = true)
+  //  |    |    |-- key: string
+  //  |    |    |-- value: struct (valueContainsNull = false)
+  //  |    |    |    |-- subfield1: integer (nullable = true)
+  //  |    |    |    |-- subfield2: array (nullable = true)
+  //  |    |    |    |    |-- element: integer (containsNull = false)
+  //  |    |-- field8: map (nullable = true)
+  //  |    |    |-- key: string
+  //  |    |    |-- value: array (valueContainsNull = false)
+  //  |    |    |    |-- element: struct (containsNull = true)
+  //  |    |    |    |    |-- subfield1: integer (nullable = true)
+  //  |    |    |    |    |-- subfield2: array (nullable = true)
+  //  |    |    |    |    |    |-- element: integer (containsNull = false)
+  private val structWithMap = StructType(
+    ignoredField ::
+    StructField("col2", StructType(
+      StructField("field4", MapType(StringType, StructType(
+        StructField("subfield1", IntegerType) ::
+          StructField("subfield2", ArrayType(IntegerType, containsNull = 
false)) :: Nil
+      ), valueContainsNull = false)) ::
+      StructField("field8", MapType(StringType, ArrayType(StructType(
+        StructField("subfield1", IntegerType) ::
+          StructField("subfield2", ArrayType(IntegerType, containsNull = 
false)) :: Nil)
+      ), valueContainsNull = false)) :: Nil
+    )) :: Nil
+  )
+
+  testSelect(structWithMap, "col2.field4['foo'].subfield1 as foo") {
+    StructField("col2", StructType(
+      StructField("field4", MapType(StringType, StructType(
+        StructField("subfield1", IntegerType) :: Nil), valueContainsNull = 
false)) :: Nil))
+  }
+
+  testSelect(structWithMap,
+    "col2.field4['foo'].subfield2 as foo", "col2.field4['foo'].subfield2[0] as 
foo") {
+    StructField("col2", StructType(
+      StructField("field4", MapType(StringType, StructType(
+        StructField("subfield2", ArrayType(IntegerType, containsNull = false))
+          :: Nil), valueContainsNull = false)) :: Nil))
+  }
+
+  //  |-- col1: string (nullable = false)
+  //  |-- col2: struct (nullable = true)
+  //  |    |-- field5: array (nullable = false)
+  //  |    |    |-- element: struct (containsNull = true)
+  //  |    |    |    |-- subfield1: struct (nullable = false)
+  //  |    |    |    |    |-- subsubfield1: integer (nullable = true)
+  //  |    |    |    |    |-- subsubfield2: integer (nullable = true)
+  //  |    |    |    |-- subfield2: struct (nullable = true)
+  //  |    |    |    |    |-- subsubfield1: struct (nullable = true)
+  //  |    |    |    |    |    |-- subsubsubfield1: string (nullable = true)
+  //  |    |    |    |    |-- subsubfield2: integer (nullable = true)
+  private val structWithArray = StructType(
+    ignoredField ::
+    StructField("col2", StructType(
+      StructField("field5", ArrayType(StructType(
+        StructField("subfield1", StructType(
+          StructField("subsubfield1", IntegerType) ::
+          StructField("subsubfield2", IntegerType) :: Nil), nullable = false) 
::
+        StructField("subfield2", StructType(
+          StructField("subsubfield1", StructType(
+            StructField("subsubsubfield1", StringType) :: Nil)) ::
+          StructField("subsubfield2", IntegerType) :: Nil)) :: Nil)), nullable 
= false) :: Nil)
+    ) :: Nil
+  )
+
+  testSelect(structWithArray, "col2.field5.subfield1") {
+    StructField("col2", StructType(
+      StructField("field5", ArrayType(StructType(
+        StructField("subfield1", StructType(
+          StructField("subsubfield1", IntegerType) ::
+          StructField("subsubfield2", IntegerType) :: Nil), nullable = false)
+          :: Nil)), nullable = false) :: Nil))
+  }
+
+  testSelect(structWithArray, "col2.field5.subfield1.subsubfield1") {
+    StructField("col2", StructType(
+      StructField("field5", ArrayType(StructType(
+        StructField("subfield1", StructType(
+          StructField("subsubfield1", IntegerType) :: Nil), nullable = false)
+          :: Nil)), nullable = false) :: Nil))
+  }
+
+  testSelect(structWithArray, 
"col2.field5.subfield2.subsubfield1.subsubsubfield1") {
+    StructField("col2", StructType(
+      StructField("field5", ArrayType(StructType(
+        StructField("subfield2", StructType(
+          StructField("subsubfield1", StructType(
+            StructField("subsubsubfield1", StringType) :: Nil)) :: Nil))
+          :: Nil)), nullable = false) :: Nil))
+  }
+
+  testSelect(structWithMap, "col2.field8['foo'][0].subfield1 as foo") {
+    StructField("col2", StructType(
+      StructField("field8", MapType(StringType, ArrayType(StructType(
+        StructField("subfield1", IntegerType) :: Nil)), valueContainsNull = 
false)) :: Nil))
+  }
+
+  testSelect(nestedComplex, "col2.field1") {
+    StructField("col2", StructType(
+      StructField("field1", IntegerType) :: Nil))
+  }
+
+  testSelect(nestedComplex, "col2.field6") {
+    StructField("col2", StructType(
+      StructField("field6", StructType(
+        StructField("subfield1", StringType, nullable = false) ::
+        StructField("subfield2", StringType) :: Nil)) :: Nil))
+  }
+
+  testSelect(nestedComplex, "col2.field6.subfield1") {
+    StructField("col2", StructType(
+      StructField("field6", StructType(
+        StructField("subfield1", StringType, nullable = false) :: Nil)) :: 
Nil))
+  }
+
+  testSelect(nestedComplex, "col2.field7.subfield1") {
+    StructField("col2", StructType(
+      StructField("field7", StructType(
+        StructField("subfield1", StructType(
+          StructField("subsubfield1", IntegerType) ::
+          StructField("subsubfield2", IntegerType) :: Nil)) :: Nil)) :: Nil))
+  }
+
+  //  |-- col1: string (nullable = false)
+  //  |-- col3: array (nullable = false)
+  //  |    |-- element: struct (containsNull = false)
+  //  |    |    |-- field1: struct (nullable = true)
+  //  |    |    |    |-- subfield1: integer (nullable = false)
+  //  |    |    |    |-- subfield2: integer (nullable = true)
+  //  |    |    |-- field2: map (nullable = true)
+  //  |    |    |    |-- key: string
+  //  |    |    |    |-- value: integer (valueContainsNull = false)
+  private val arrayWithStructAndMap = StructType(Array(
+    StructField("col3", ArrayType(StructType(
+      StructField("field1", StructType(
+        StructField("subfield1", IntegerType, nullable = false) ::
+        StructField("subfield2", IntegerType) :: Nil)) ::
+      StructField("field2", MapType(StringType, IntegerType, valueContainsNull 
= false))
+      :: Nil), containsNull = false), nullable = false)
+  ))
+
+  testSelect(arrayWithStructAndMap, "col3.field1.subfield1") {
+    StructField("col3", ArrayType(StructType(
+      StructField("field1", StructType(
+        StructField("subfield1", IntegerType, nullable = false) :: Nil))
+        :: Nil), containsNull = false), nullable = false)
+  }
+
+  testSelect(arrayWithStructAndMap, "col3.field2['foo'] as foo") {
+    StructField("col3", ArrayType(StructType(
+      StructField("field2", MapType(StringType, IntegerType, valueContainsNull 
= false))
+        :: Nil), containsNull = false), nullable = false)
+  }
+
+  //  |-- col1: string (nullable = false)
+  //  |-- col4: map (nullable = false)
+  //  |    |-- key: string
+  //  |    |-- value: struct (valueContainsNull = false)
+  //  |    |    |-- field1: struct (nullable = true)
+  //  |    |    |    |-- subfield1: integer (nullable = false)
+  //  |    |    |    |-- subfield2: integer (nullable = true)
+  //  |    |    |-- field2: map (nullable = true)
+  //  |    |    |    |-- key: string
+  //  |    |    |    |-- value: integer (valueContainsNull = false)
+  private val col4 = StructType(Array(ignoredField,
+    StructField("col4", MapType(StringType, StructType(
+      StructField("field1", StructType(
+        StructField("subfield1", IntegerType, nullable = false) ::
+        StructField("subfield2", IntegerType) :: Nil)) ::
+        StructField("field2", MapType(StringType, IntegerType, 
valueContainsNull = false))
+        :: Nil), valueContainsNull = false), nullable = false)
+  ))
+
+  testSelect(col4, "col4['foo'].field1.subfield1 as foo") {
+    StructField("col4", MapType(StringType, StructType(
+      StructField("field1", StructType(
+        StructField("subfield1", IntegerType, nullable = false) :: Nil))
+        :: Nil), valueContainsNull = false), nullable = false)
+  }
+
+  testSelect(col4, "col4['foo'].field2['bar'] as foo") {
+    StructField("col4", MapType(StringType, StructType(
+      StructField("field2", MapType(StringType, IntegerType, valueContainsNull 
= false))
+        :: Nil), valueContainsNull = false), nullable = false)
+  }
+
+  //  |-- col1: string (nullable = false)
+  //  |-- col5: array (nullable = true)
+  //  |    |-- element: map (containsNull = true)
+  //  |    |    |-- key: string
+  //  |    |    |-- value: struct (valueContainsNull = false)
+  //  |    |    |    |-- field1: struct (nullable = true)
+  //  |    |    |    |    |-- subfield1: integer (nullable = true)
+  //  |    |    |    |    |-- subfield2: integer (nullable = true)
+  private val arrayOfStruct = StructType(Array(ignoredField,
+    StructField("col5", ArrayType(MapType(StringType, StructType(
+      StructField("field1", StructType(
+        StructField("subfield1", IntegerType) ::
+        StructField("subfield2", IntegerType) :: Nil)) :: Nil), 
valueContainsNull = false)))
+  ))
+
+  testSelect(arrayOfStruct, "col5[0]['foo'].field1.subfield1 as foo") {
+    StructField("col5", ArrayType(MapType(StringType, StructType(
+      StructField("field1", StructType(
+        StructField("subfield1", IntegerType) :: Nil)) :: Nil), 
valueContainsNull = false)))
+  }
+
+  //  |-- col1: string (nullable = false)
+  //  |-- col6: map (nullable = true)
+  //  |    |-- key: string
+  //  |    |-- value: array (valueContainsNull = true)
+  //  |    |    |-- element: struct (containsNull = false)
+  //  |    |    |    |-- field1: struct (nullable = true)
+  //  |    |    |    |    |-- subfield1: integer (nullable = true)
+  //  |    |    |    |    |-- subfield2: integer (nullable = true)
+  private val mapOfArray = StructType(Array(ignoredField,
+    StructField("col6", MapType(StringType, ArrayType(StructType(
+      StructField("field1", StructType(
+        StructField("subfield1", IntegerType) ::
+        StructField("subfield2", IntegerType) :: Nil)) :: Nil), containsNull = 
false)))))
+
+  testSelect(mapOfArray, "col6['foo'][0].field1.subfield1 as foo") {
+    StructField("col6", MapType(StringType, ArrayType(StructType(
+      StructField("field1", StructType(
+        StructField("subfield1", IntegerType) :: Nil)) :: Nil), containsNull = 
false)))
+  }
+
+  // An array with a struct with a different fields
+  //  |-- col1: string (nullable = false)
+  //  |-- col7: array (nullable = true)
+  //  |    |-- element: struct (containsNull = true)
+  //  |    |    |-- field1: integer (nullable = false)
+  //  |    |    |-- field2: struct (nullable = true)
+  //  |    |    |    |-- subfield1: integer (nullable = false)
+  //  |    |    |-- field3: array (nullable = true)
+  //  |    |    |    |-- element: struct (containsNull = true)
+  //  |    |    |    |    |-- subfield1: integer (nullable = false)
+  private val arrayWithMultipleFields = StructType(Array(ignoredField,
+    StructField("col7", ArrayType(StructType(
+    StructField("field1", IntegerType, nullable = false) ::
+      StructField("field2", StructType(
+        StructField("subfield1", IntegerType, nullable = false) :: Nil)) ::
+      StructField("field3", ArrayType(StructType(
+        StructField("subfield1", IntegerType, nullable = false) :: Nil))) :: 
Nil)))))
+
+  testSelect(arrayWithMultipleFields,
+    "col7.field1", "col7[0].field1 as foo", "col7.field1[0] as foo") {
+    StructField("col7", ArrayType(StructType(
+      StructField("field1", IntegerType, nullable = false) :: Nil)))
+  }
+
+  testSelect(arrayWithMultipleFields, "col7.field2.subfield1") {
+    StructField("col7", ArrayType(StructType(
+      StructField("field2", StructType(
+        StructField("subfield1", IntegerType, nullable = false) :: Nil)) :: 
Nil)))
+  }
+
+  testSelect(arrayWithMultipleFields, "col7.field3.subfield1") {
+    StructField("col7", ArrayType(StructType(
+      StructField("field3", ArrayType(StructType(
+        StructField("subfield1", IntegerType, nullable = false) :: Nil))) :: 
Nil)))
+  }
+
+  // Array with a nested int array
+  //  |-- col1: string (nullable = false)
+  //  |-- col8: array (nullable = true)
+  //  |    |-- element: struct (containsNull = true)
+  //  |    |    |-- field1: array (nullable = false)
+  //  |    |    |    |-- element: integer (containsNull = false)
+  private val arrayOfArray = StructType(Array(ignoredField,
+    StructField("col8",
+      ArrayType(StructType(Array(StructField("field1",
+        ArrayType(IntegerType, containsNull = false), nullable = false))))
+    )))
+
+  testSelect(arrayOfArray, "col8.field1",
+    "col8[0].field1 as foo",
+    "col8.field1[0] as foo",
+    "col8[0].field1[0] as foo") {
+    StructField("col8", ArrayType(StructType(
+      StructField("field1", ArrayType(IntegerType, containsNull = false), 
nullable = false)
+        :: Nil)))
+  }
+
+  def assertResult(expected: StructField)(actual: StructField)(selectExpr: 
String): Unit = {
+    try {
+      super.assertResult(expected)(actual)
+    } catch {
+      case ex: TestFailedException =>
+        // Print some helpful diagnostics in the case of failure
+        alert("Expected SELECT \"" + selectExpr + "\" to select the schema\n" +
+          indent(StructType(expected :: Nil).treeString) +
+          indent("but it actually selected\n") +
+          indent(StructType(actual :: Nil).treeString) +
+          indent("Note that expected.dataType.sameType(actual.dataType) = " +
+          expected.dataType.sameType(actual.dataType)))
+        throw ex
+    }
+  }
+
+  // Test that the given SELECT expressions prune the test schema to the 
single-column schema
+  // defined by the given field
+  private def testSelect(inputSchema: StructType, selectExprs: String*)
+                        (expected: StructField) {
+    test(s"SELECT ${selectExprs.map(s => s""""$s"""").mkString(", ")} should 
select the schema\n" +
+      indent(StructType(expected :: Nil).treeString)) {
+      for (selectExpr <- selectExprs) {
+        assertSelect(selectExpr, expected, inputSchema)
+      }
+    }
+  }
+
+  private def assertSelect(expr: String, expected: StructField, inputSchema: 
StructType): Unit = {
+    val relation = LocalRelation(inputSchema.toAttributes)
+    unapplySelect(expr, relation) match {
+      case Some(field) =>
+        assertResult(expected)(field)(expr)
+      case None =>
+        val failureMessage =
+          "Failed to select a field from " + expr + ". " +
+          "Expected:\n" +
+          StructType(expected :: Nil).treeString
+        fail(failureMessage)
+    }
+  }
+
+  private def unapplySelect(expr: String, relation: LocalRelation) = {
+    val parsedExpr = parseAsCatalystExpression(Seq(expr)).head
+    val select = relation.select(parsedExpr)
+    val analyzed = select.analyze
+    SelectedField.unapply(analyzed.expressions.head)
+  }
+
+  private def parseAsCatalystExpression(exprs: Seq[String]) = {
+    exprs.map(CatalystSqlParser.parseExpression(_) match {
+      case namedExpr: NamedExpression => namedExpr
+    })
+  }
+
+  // Indent every line in `string` by four spaces
+  private def indent(string: String) = string.replaceAll("(?m)^", "   ")
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f2d35427/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index dbf6377..54c77dd 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -108,7 +108,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest 
with SharedSQLContext
       val queryOutput = selfJoin.queryExecution.analyzed.output
 
       assertResult(4, "Field count mismatches")(queryOutput.size)
-      assertResult(2, "Duplicated expression ID in query plan:\n $selfJoin") {
+      assertResult(2, s"Duplicated expression ID in query plan:\n $selfJoin") {
         queryOutput.filter(_.name == "_1").map(_.exprId).size
       }
 
@@ -117,7 +117,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest 
with SharedSQLContext
   }
 
   test("nested data - struct with array field") {
-    val data = (1 to 10).map(i => Tuple1((i, Seq("val_$i"))))
+    val data = (1 to 10).map(i => Tuple1((i, Seq(s"val_$i"))))
     withParquetTable(data, "t") {
       checkAnswer(sql("SELECT _1._2[0] FROM t"), data.map {
         case Tuple1((_, Seq(string))) => Row(string)
@@ -126,7 +126,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest 
with SharedSQLContext
   }
 
   test("nested data - array of struct") {
-    val data = (1 to 10).map(i => Tuple1(Seq(i -> "val_$i")))
+    val data = (1 to 10).map(i => Tuple1(Seq(i -> s"val_$i")))
     withParquetTable(data, "t") {
       checkAnswer(sql("SELECT _1[0]._2 FROM t"), data.map {
         case Tuple1(Seq((_, string))) => Row(string)

http://git-wip-us.apache.org/repos/asf/spark/blob/f2d35427/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
new file mode 100644
index 0000000..eb99654
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.io.File
+
+import org.scalactic.Equality
+
+import org.apache.spark.sql.{DataFrame, QueryTest, Row}
+import org.apache.spark.sql.catalyst.SchemaPruningTest
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.execution.FileSourceScanExec
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.StructType
+
+class ParquetSchemaPruningSuite
+    extends QueryTest
+    with ParquetTest
+    with SchemaPruningTest
+    with SharedSQLContext {
+  case class FullName(first: String, middle: String, last: String)
+  case class Contact(
+    id: Int,
+    name: FullName,
+    address: String,
+    pets: Int,
+    friends: Array[FullName] = Array.empty,
+    relatives: Map[String, FullName] = Map.empty)
+
+  val janeDoe = FullName("Jane", "X.", "Doe")
+  val johnDoe = FullName("John", "Y.", "Doe")
+  val susanSmith = FullName("Susan", "Z.", "Smith")
+
+  private val contacts =
+    Contact(0, janeDoe, "123 Main Street", 1, friends = Array(susanSmith),
+      relatives = Map("brother" -> johnDoe)) ::
+    Contact(1, johnDoe, "321 Wall Street", 3, relatives = Map("sister" -> 
janeDoe)) :: Nil
+
+  case class Name(first: String, last: String)
+  case class BriefContact(id: Int, name: Name, address: String)
+
+  private val briefContacts =
+    BriefContact(2, Name("Janet", "Jones"), "567 Maple Drive") ::
+    BriefContact(3, Name("Jim", "Jones"), "6242 Ash Street") :: Nil
+
+  case class ContactWithDataPartitionColumn(
+    id: Int,
+    name: FullName,
+    address: String,
+    pets: Int,
+    friends: Array[FullName] = Array(),
+    relatives: Map[String, FullName] = Map(),
+    p: Int)
+
+  case class BriefContactWithDataPartitionColumn(id: Int, name: Name, address: 
String, p: Int)
+
+  private val contactsWithDataPartitionColumn =
+    contacts.map { case Contact(id, name, address, pets, friends, relatives) =>
+      ContactWithDataPartitionColumn(id, name, address, pets, friends, 
relatives, 1) }
+  private val briefContactsWithDataPartitionColumn =
+    briefContacts.map { case BriefContact(id, name, address) =>
+      BriefContactWithDataPartitionColumn(id, name, address, 2) }
+
+  testSchemaPruning("select a single complex field") {
+    val query = sql("select name.middle from contacts")
+    checkScan(query, "struct<name:struct<middle:string>>")
+    checkAnswer(query.orderBy("id"), Row("X.") :: Row("Y.") :: Row(null) :: 
Row(null) :: Nil)
+  }
+
+  testSchemaPruning("select a single complex field and its parent struct") {
+    val query = sql("select name.middle, name from contacts")
+    checkScan(query, 
"struct<name:struct<first:string,middle:string,last:string>>")
+    checkAnswer(query.orderBy("id"),
+      Row("X.", Row("Jane", "X.", "Doe")) ::
+      Row("Y.", Row("John", "Y.", "Doe")) ::
+      Row(null, Row("Janet", null, "Jones")) ::
+      Row(null, Row("Jim", null, "Jones")) ::
+      Nil)
+  }
+
+  testSchemaPruning("select a single complex field array and its parent struct 
array") {
+    val query = sql("select friends.middle, friends from contacts where p=1")
+    checkScan(query,
+      "struct<friends:array<struct<first:string,middle:string,last:string>>>")
+    checkAnswer(query.orderBy("id"),
+      Row(Array("Z."), Array(Row("Susan", "Z.", "Smith"))) ::
+      Row(Array.empty[String], Array.empty[Row]) ::
+      Nil)
+  }
+
+  testSchemaPruning("select a single complex field from a map entry and its 
parent map entry") {
+    val query =
+      sql("select relatives[\"brother\"].middle, relatives[\"brother\"] from 
contacts where p=1")
+    checkScan(query,
+      
"struct<relatives:map<string,struct<first:string,middle:string,last:string>>>")
+    checkAnswer(query.orderBy("id"),
+      Row("Y.", Row("John", "Y.", "Doe")) ::
+      Row(null, null) ::
+      Nil)
+  }
+
+  testSchemaPruning("select a single complex field and the partition column") {
+    val query = sql("select name.middle, p from contacts")
+    checkScan(query, "struct<name:struct<middle:string>>")
+    checkAnswer(query.orderBy("id"),
+      Row("X.", 1) :: Row("Y.", 1) :: Row(null, 2) :: Row(null, 2) :: Nil)
+  }
+
+  ignore("partial schema intersection - select missing subfield") {
+    val query = sql("select name.middle, address from contacts where p=2")
+    checkScan(query, "struct<name:struct<middle:string>,address:string>")
+    checkAnswer(query.orderBy("id"),
+      Row(null, "567 Maple Drive") ::
+      Row(null, "6242 Ash Street") :: Nil)
+  }
+
+  testSchemaPruning("no unnecessary schema pruning") {
+    val query =
+      sql("select id, name.last, name.middle, name.first, relatives[''].last, 
" +
+        "relatives[''].middle, relatives[''].first, friends[0].last, 
friends[0].middle, " +
+        "friends[0].first, pets, address from contacts where p=2")
+    // We've selected every field in the schema. Therefore, no schema pruning 
should be performed.
+    // We check this by asserting that the scanned schema of the query is 
identical to the schema
+    // of the contacts relation, even though the fields are selected in 
different orders.
+    checkScan(query,
+      
"struct<id:int,name:struct<first:string,middle:string,last:string>,address:string,pets:int,"
 +
+      "friends:array<struct<first:string,middle:string,last:string>>," +
+      "relatives:map<string,struct<first:string,middle:string,last:string>>>")
+    checkAnswer(query.orderBy("id"),
+      Row(2, "Jones", null, "Janet", null, null, null, null, null, null, null, 
"567 Maple Drive") ::
+      Row(3, "Jones", null, "Jim", null, null, null, null, null, null, null, 
"6242 Ash Street") ::
+      Nil)
+  }
+
+  testSchemaPruning("empty schema intersection") {
+    val query = sql("select name.middle from contacts where p=2")
+    checkScan(query, "struct<name:struct<middle:string>>")
+    checkAnswer(query.orderBy("id"),
+      Row(null) :: Row(null) :: Nil)
+  }
+
+  private def testSchemaPruning(testName: String)(testThunk: => Unit) {
+    withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
+      test(s"Spark vectorized reader - without partition data column - 
$testName") {
+        withContacts(testThunk)
+      }
+      test(s"Spark vectorized reader - with partition data column - 
$testName") {
+        withContactsWithDataPartitionColumn(testThunk)
+      }
+    }
+
+    withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
+      test(s"Parquet-mr reader - without partition data column - $testName") {
+        withContacts(testThunk)
+      }
+      test(s"Parquet-mr reader - with partition data column - $testName") {
+        withContactsWithDataPartitionColumn(testThunk)
+      }
+    }
+  }
+
+  private def withContacts(testThunk: => Unit) {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+
+      makeParquetFile(contacts, new File(path + "/contacts/p=1"))
+      makeParquetFile(briefContacts, new File(path + "/contacts/p=2"))
+
+      spark.read.parquet(path + 
"/contacts").createOrReplaceTempView("contacts")
+
+      testThunk
+    }
+  }
+
+  private def withContactsWithDataPartitionColumn(testThunk: => Unit) {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+
+      makeParquetFile(contactsWithDataPartitionColumn, new File(path + 
"/contacts/p=1"))
+      makeParquetFile(briefContactsWithDataPartitionColumn, new File(path + 
"/contacts/p=2"))
+
+      spark.read.parquet(path + 
"/contacts").createOrReplaceTempView("contacts")
+
+      testThunk
+    }
+  }
+
+  case class MixedCaseColumn(a: String, B: Int)
+  case class MixedCase(id: Int, CoL1: String, coL2: MixedCaseColumn)
+
+  private val mixedCaseData =
+    MixedCase(0, "r0c1", MixedCaseColumn("abc", 1)) ::
+    MixedCase(1, "r1c1", MixedCaseColumn("123", 2)) ::
+    Nil
+
+  testMixedCasePruning("select with exact column names") {
+    val query = sql("select CoL1, coL2.B from mixedcase")
+    checkScan(query, "struct<CoL1:string,coL2:struct<B:int>>")
+    checkAnswer(query.orderBy("id"),
+      Row("r0c1", 1) ::
+      Row("r1c1", 2) ::
+      Nil)
+  }
+
+  testMixedCasePruning("select with lowercase column names") {
+    val query = sql("select col1, col2.b from mixedcase")
+    checkScan(query, "struct<CoL1:string,coL2:struct<B:int>>")
+    checkAnswer(query.orderBy("id"),
+      Row("r0c1", 1) ::
+      Row("r1c1", 2) ::
+      Nil)
+  }
+
+  testMixedCasePruning("select with different-case column names") {
+    val query = sql("select cOL1, cOl2.b from mixedcase")
+    checkScan(query, "struct<CoL1:string,coL2:struct<B:int>>")
+    checkAnswer(query.orderBy("id"),
+      Row("r0c1", 1) ::
+      Row("r1c1", 2) ::
+      Nil)
+  }
+
+  testMixedCasePruning("filter with different-case column names") {
+    val query = sql("select id from mixedcase where Col2.b = 2")
+    // Pruning with filters is currently unsupported. As-is, the file reader 
will read the id column
+    // and the entire coL2 struct. Once pruning with filters has been 
implemented we can uncomment
+    // this line
+    // checkScan(query, "struct<id:int,coL2:struct<B:int>>")
+    checkAnswer(query.orderBy("id"), Row(1) :: Nil)
+  }
+
+  private def testMixedCasePruning(testName: String)(testThunk: => Unit) {
+    withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true",
+      SQLConf.CASE_SENSITIVE.key -> "true") {
+      test(s"Spark vectorized reader - case-sensitive parser - mixed-case 
schema - $testName") {
+          withMixedCaseData(testThunk)
+      }
+    }
+    withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false",
+      SQLConf.CASE_SENSITIVE.key -> "false") {
+      test(s"Parquet-mr reader - case-insensitive parser - mixed-case schema - 
$testName") {
+        withMixedCaseData(testThunk)
+      }
+    }
+    withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true",
+      SQLConf.CASE_SENSITIVE.key -> "false") {
+      test(s"Spark vectorized reader - case-insensitive parser - mixed-case 
schema - $testName") {
+          withMixedCaseData(testThunk)
+      }
+    }
+    withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false",
+      SQLConf.CASE_SENSITIVE.key -> "true") {
+      test(s"Parquet-mr reader - case-sensitive parser - mixed-case schema - 
$testName") {
+        withMixedCaseData(testThunk)
+      }
+    }
+  }
+
+  private def withMixedCaseData(testThunk: => Unit) {
+    withParquetTable(mixedCaseData, "mixedcase") {
+      testThunk
+    }
+  }
+
+  private val schemaEquality = new Equality[StructType] {
+    override def areEqual(a: StructType, b: Any): Boolean =
+      b match {
+        case otherType: StructType => a.sameType(otherType)
+        case _ => false
+      }
+  }
+
+  protected def checkScan(df: DataFrame, expectedSchemaCatalogStrings: 
String*): Unit = {
+    checkScanSchemata(df, expectedSchemaCatalogStrings: _*)
+    // We check here that we can execute the query without throwing an 
exception. The results
+    // themselves are irrelevant, and should be checked elsewhere as needed
+    df.collect()
+  }
+
+  private def checkScanSchemata(df: DataFrame, expectedSchemaCatalogStrings: 
String*): Unit = {
+    val fileSourceScanSchemata =
+      df.queryExecution.executedPlan.collect {
+        case scan: FileSourceScanExec => scan.requiredSchema
+      }
+    assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size,
+      s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " +
+        s"but expected $expectedSchemaCatalogStrings")
+    fileSourceScanSchemata.zip(expectedSchemaCatalogStrings).foreach {
+      case (scanSchema, expectedScanSchemaCatalogString) =>
+        val expectedScanSchema = 
CatalystSqlParser.parseDataType(expectedScanSchemaCatalogString)
+        implicit val equality = schemaEquality
+        assert(scanSchema === expectedScanSchema)
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to