[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2018-05-14 Thread mallman
Github user mallman closed the pull request at:

https://github.com/apache/spark/pull/16578


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2018-04-15 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r181575614
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -151,6 +151,9 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
 // The following batch should be executed after batch "Join Reorder" 
and "LocalRelation".
 Batch("Check Cartesian Products", Once,
   CheckCartesianProducts) :+
+Batch("Field Extraction Pushdown", fixedPoint,
+  AggregateFieldExtractionPushdown,
+  JoinFieldExtractionPushdown) :+
--- End diff --

Hi @gatorsmile.

Given the scope of your request, can I ask you to provide a reason for it? 
What you ask would invalidate some of the existing conversation and review of 
this PR. It would also substantially restrict the practical usability of this 
patch.

I believe I've written this patch with a logical separation of concerns 
along the lines you've requested. As a compromise, would you consider an 
incremental review starting with the basic projection/filter functionality and 
proceeding to the optimizer rules following them?

BTW I'm traveling for a few weeks, and I'm spending most of my time away 
from work. If I'm delayed in responding, that's the reason. I'll still keep up, 
but at a slower pace.

Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2018-04-10 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r180495402
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -151,6 +151,9 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
 // The following batch should be executed after batch "Join Reorder" 
and "LocalRelation".
 Batch("Check Cartesian Products", Once,
   CheckCartesianProducts) :+
+Batch("Field Extraction Pushdown", fixedPoint,
+  AggregateFieldExtractionPushdown,
+  JoinFieldExtractionPushdown) :+
--- End diff --

@mallman Could you split these new optimizer rules to two PRs first?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-19 Thread DaimonPl
Github user DaimonPl commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r151917066
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -961,6 +961,15 @@ object SQLConf {
 .booleanConf
 .createWithDefault(true)
 
+  val NESTED_SCHEMA_PRUNING_ENABLED =
+buildConf("spark.sql.nestedSchemaPruning.enabled")
+  .internal()
+  .doc("Prune nested fields from a logical relation's output which are 
unnecessary in " +
+"satisfying a query. This optimization allows columnar file format 
readers to avoid " +
+"reading unnecessary nested column data.")
+  .booleanConf
+  .createWithDefault(true)
--- End diff --

So maybe at least make it true for some core sql/parquet test suits?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-16 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r151597063
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -961,6 +961,15 @@ object SQLConf {
 .booleanConf
 .createWithDefault(true)
 
+  val NESTED_SCHEMA_PRUNING_ENABLED =
+buildConf("spark.sql.nestedSchemaPruning.enabled")
+  .internal()
+  .doc("Prune nested fields from a logical relation's output which are 
unnecessary in " +
+"satisfying a query. This optimization allows columnar file format 
readers to avoid " +
+"reading unnecessary nested column data.")
+  .booleanConf
+  .createWithDefault(true)
--- End diff --

As I know, seems we don't have a config setting specified for all tests 
previously. We have set a config for a whole test suite, but not for all tests.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-16 Thread DaimonPl
Github user DaimonPl commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r151485646
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -961,6 +961,15 @@ object SQLConf {
 .booleanConf
 .createWithDefault(true)
 
+  val NESTED_SCHEMA_PRUNING_ENABLED =
+buildConf("spark.sql.nestedSchemaPruning.enabled")
+  .internal()
+  .doc("Prune nested fields from a logical relation's output which are 
unnecessary in " +
+"satisfying a query. This optimization allows columnar file format 
readers to avoid " +
+"reading unnecessary nested column data.")
+  .booleanConf
+  .createWithDefault(true)
--- End diff --

Nope :( maybe @viirya can give input about it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-16 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r151477529
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -961,6 +961,15 @@ object SQLConf {
 .booleanConf
 .createWithDefault(true)
 
+  val NESTED_SCHEMA_PRUNING_ENABLED =
+buildConf("spark.sql.nestedSchemaPruning.enabled")
+  .internal()
+  .doc("Prune nested fields from a logical relation's output which are 
unnecessary in " +
+"satisfying a query. This optimization allows columnar file format 
readers to avoid " +
+"reading unnecessary nested column data.")
+  .booleanConf
+  .createWithDefault(true)
--- End diff --

Ah. Sounds reasonable. Do you know how to do that? Is there a precedent I 
can follow? I'm not aware of one.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-16 Thread DaimonPl
Github user DaimonPl commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r151476679
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -961,6 +961,15 @@ object SQLConf {
 .booleanConf
 .createWithDefault(true)
 
+  val NESTED_SCHEMA_PRUNING_ENABLED =
+buildConf("spark.sql.nestedSchemaPruning.enabled")
+  .internal()
+  .doc("Prune nested fields from a logical relation's output which are 
unnecessary in " +
+"satisfying a query. This optimization allows columnar file format 
readers to avoid " +
+"reading unnecessary nested column data.")
+  .booleanConf
+  .createWithDefault(true)
--- End diff --

Just to be clear. I mean to make it default true for all tests in spark. 
Not only those explicitly related to this feature :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-16 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r151474342
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -961,6 +961,15 @@ object SQLConf {
 .booleanConf
 .createWithDefault(true)
 
+  val NESTED_SCHEMA_PRUNING_ENABLED =
+buildConf("spark.sql.nestedSchemaPruning.enabled")
+  .internal()
+  .doc("Prune nested fields from a logical relation's output which are 
unnecessary in " +
+"satisfying a query. This optimization allows columnar file format 
readers to avoid " +
+"reading unnecessary nested column data.")
+  .booleanConf
+  .createWithDefault(true)
--- End diff --

It needs to be set `true` for the tests. This can be done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-16 Thread DaimonPl
Github user DaimonPl commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r151344821
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -961,6 +961,15 @@ object SQLConf {
 .booleanConf
 .createWithDefault(true)
 
+  val NESTED_SCHEMA_PRUNING_ENABLED =
+buildConf("spark.sql.nestedSchemaPruning.enabled")
+  .internal()
+  .doc("Prune nested fields from a logical relation's output which are 
unnecessary in " +
+"satisfying a query. This optimization allows columnar file format 
readers to avoid " +
+"reading unnecessary nested column data.")
+  .booleanConf
+  .createWithDefault(true)
--- End diff --

How about making it default true for tests?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-14 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r151026919
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -961,6 +961,15 @@ object SQLConf {
 .booleanConf
 .createWithDefault(true)
 
+  val NESTED_SCHEMA_PRUNING_ENABLED =
+buildConf("spark.sql.nestedSchemaPruning.enabled")
+  .internal()
+  .doc("Prune nested fields from a logical relation's output which are 
unnecessary in " +
+"satisfying a query. This optimization allows columnar file format 
readers to avoid " +
+"reading unnecessary nested column data.")
+  .booleanConf
+  .createWithDefault(true)
--- End diff --

Giving it more though, I believe it's prudent to choose correctness over 
performance. I will change the default to `false`. "Power users" will set it to 
`true` and (hopefully) report a problem if they run into one.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-10 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r150261547
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
 ---
@@ -63,9 +74,22 @@ private[parquet] class ParquetReadSupport extends 
ReadSupport[UnsafeRow] with Lo
   StructType.fromString(schemaString)
 }
 
-val parquetRequestedSchema =
+val clippedParquetSchema =
   ParquetReadSupport.clipParquetSchema(context.getFileSchema, 
catalystRequestedSchema)
 
+val parquetRequestedSchema = if (parquetMrCompatibility) {
+  // Parquet-mr will throw an exception if we try to read a superset 
of the file's schema.
+  // Therefore, we intersect our clipped schema with the underlying 
file's schema
--- End diff --

This is interesting because if we don't do nested pruning, the superset of 
parquet reading schema like:

```
message spark_schema {
  optional group name {
optional binary first (UTF8);
optional binary middle (UTF8);
optional binary last (UTF8);
  }
  optional binary address (UTF8);
}
```

won't cause any failure.

Once we perform nested pruning, the required parquet schema becomes:

```
message spark_schema {
  optional group name {
optional binary middle (UTF8);
  }
  optional binary address (UTF8);
}
```

Then if we don't remove the "group name" from the required schema, the 
failure happens.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-10 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r149880006
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -961,6 +961,15 @@ object SQLConf {
 .booleanConf
 .createWithDefault(true)
 
+  val NESTED_SCHEMA_PRUNING_ENABLED =
+buildConf("spark.sql.nestedSchemaPruning.enabled")
+  .internal()
+  .doc("Prune nested fields from a logical relation's output which are 
unnecessary in " +
+"satisfying a query. This optimization allows columnar file format 
readers to avoid " +
+"reading unnecessary nested column data.")
+  .booleanConf
+  .createWithDefault(true)
--- End diff --

nit: are we confident enough to set it as true by default?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-10 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r149293915
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
 ---
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, 
ProjectionOverSchema, SelectedField}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.types.{ArrayType, DataType, MapType, 
StructField, StructType}
+
+/**
+ * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a
+ * [[ParquetRelation]]. By "Parquet column", we mean a column as defined 
in the
+ * Parquet format. In Spark SQL, a root-level Parquet column corresponds 
to a
+ * SQL column, and a nested Parquet column corresponds to a 
[[StructField]].
+ */
+private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+plan transformDown {
+  case op @ PhysicalOperation(projects, filters,
+  l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, 
partitionSchema,
+dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _, _)) 
=>
+val projectionFields = projects.flatMap(getFields)
+val filterFields = filters.flatMap(getFields)
+val requestedFields = (projectionFields ++ filterFields).distinct
+
+// If [[requestedFields]] includes a nested field, continue. 
Otherwise,
+// return [[op]]
+if (requestedFields.exists { case (_, optAtt) => optAtt.isEmpty }) 
{
+  val prunedSchema = requestedFields
+.map { case (field, _) => StructType(Array(field)) }
+.reduceLeft(_ merge _)
+  val dataSchemaFieldNames = dataSchema.fieldNames.toSet
+  val prunedDataSchema =
+StructType(prunedSchema.filter(f => 
dataSchemaFieldNames.contains(f.name)))
+
+  // If the data schema is different from the pruned data schema, 
continue. Otherwise,
+  // return [[op]]. We effect this comparison by counting the 
number of "leaf" fields in
+  // each schemata, assuming the fields in [[prunedDataSchema]] 
are a subset of the fields
+  // in [[dataSchema]].
+  if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) {
+val prunedParquetRelation =
+  hadoopFsRelation.copy(dataSchema = 
prunedDataSchema)(hadoopFsRelation.sparkSession)
+
+// We need to replace the expression ids of the pruned 
relation output attributes
+// with the expression ids of the original relation output 
attributes so that
+// references to the original relation's output are not broken
+val outputIdMap = l.output.map(att => (att.name, 
att.exprId)).toMap
+val prunedRelationOutput =
+  prunedParquetRelation
+.schema
+.toAttributes
+.map {
+  case att if outputIdMap.contains(att.name) =>
+att.withExprId(outputIdMap(att.name))
+  case att => att
+}
+val prunedRelation =
+  l.copy(relation = prunedParquetRelation, output = 
prunedRelationOutput,
+catalogTable = None)
--- End diff --

Because `catalogTable` can contain some data such as statistics, that might 
be useful. Not sure if we may lose some optimization chance.


---


[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-10 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r149880058
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -961,6 +961,15 @@ object SQLConf {
 .booleanConf
 .createWithDefault(true)
 
+  val NESTED_SCHEMA_PRUNING_ENABLED =
+buildConf("spark.sql.nestedSchemaPruning.enabled")
+  .internal()
+  .doc("Prune nested fields from a logical relation's output which are 
unnecessary in " +
+"satisfying a query. This optimization allows columnar file format 
readers to avoid " +
+"reading unnecessary nested column data.")
--- End diff --

Please also mention this is only applied on Parquet data source for now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-03 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r148866084
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
 ---
@@ -63,9 +74,22 @@ private[parquet] class ParquetReadSupport extends 
ReadSupport[UnsafeRow] with Lo
   StructType.fromString(schemaString)
 }
 
-val parquetRequestedSchema =
+val clippedParquetSchema =
   ParquetReadSupport.clipParquetSchema(context.getFileSchema, 
catalystRequestedSchema)
 
+val parquetRequestedSchema = if (parquetMrCompatibility) {
+  // Parquet-mr will throw an exception if we try to read a superset 
of the file's schema.
+  // Therefore, we intersect our clipped schema with the underlying 
file's schema
--- End diff --

On the other hand, if we fix `parquetMrCompatibility` to `true`, then a 
couple of other tests fail. Namely, these tests are

"Filter applied on merged Parquet schema with new column should work" in 
`ParquetFilterSuite.scala`
"read partitioned table - merging compatible schemas" in 
`ParquetPartitionDiscoverySuite.scala`

In both cases, the failures involve queries over multiple files with 
compatible but different schema.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-03 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r148863673
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
 ---
@@ -63,9 +74,22 @@ private[parquet] class ParquetReadSupport extends 
ReadSupport[UnsafeRow] with Lo
   StructType.fromString(schemaString)
 }
 
-val parquetRequestedSchema =
+val clippedParquetSchema =
   ParquetReadSupport.clipParquetSchema(context.getFileSchema, 
catalystRequestedSchema)
 
+val parquetRequestedSchema = if (parquetMrCompatibility) {
+  // Parquet-mr will throw an exception if we try to read a superset 
of the file's schema.
+  // Therefore, we intersect our clipped schema with the underlying 
file's schema
--- End diff --

As for the problem of requesting a read of a superset of a file's fields, 
if we disable the `parquetMrCompatibility` code, then the "partial schema 
intersection - select missing subfield" test in 
`ParquetSchemaPruningSuite.scala` fails with the following stack trace:

```
[info]   Cause: org.apache.parquet.io.ParquetDecodingException: Can not 
read value at 0 in block -1 in file 
file:/Volumes/VideoAmpCS/msa/workspace/spark-public/target/tmp/spark-a0bda193-9d3f-4cd1-885c-9e8b5b0fc1ed/contacts/p=2/part-1-4a8671f1-afb2-482f-8c4d-4f6f4df896bc-c000.snappy.parquet
[info]   at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:223)
[info]   at 
org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:215)
[info]   at 
org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
[info]   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
[info]   at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:106)
[info]   at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:182)
[info]   at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:106)
[info]   at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
[info]   at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
[info]   at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$7$$anon$1.hasNext(WholeStageCodegenExec.scala:432)
[info]   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
[info]   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
[info]   at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1820)
[info]   at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1160)
[info]   at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1160)
[info]   at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2064)
[info]   at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2064)
[info]   at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
[info]   at org.apache.spark.scheduler.Task.run(Task.scala:108)
[info]   at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
[info]   at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[info]   at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[info]   at java.lang.Thread.run(Thread.java:748)
[info]   Cause: java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
[info]   at java.util.ArrayList.rangeCheck(ArrayList.java:653)
[info]   at java.util.ArrayList.get(ArrayList.java:429)
[info]   at 
org.apache.parquet.io.GroupColumnIO.getFirst(GroupColumnIO.java:103)
[info]   at 
org.apache.parquet.io.GroupColumnIO.getFirst(GroupColumnIO.java:103)
[info]   at 
org.apache.parquet.io.PrimitiveColumnIO.getFirst(PrimitiveColumnIO.java:102)
[info]   at 
org.apache.parquet.io.PrimitiveColumnIO.isFirst(PrimitiveColumnIO.java:97)
[info]   at 
org.apache.parquet.io.RecordReaderImplementation.(RecordReaderImplementation.java:278)
[info]   at 
org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:141)
[info]   at 
org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:107)
[info]   at 
org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:155)
[info]   at 
org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:107)
[info]   at 
org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:136)
[info]   at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:194)
[info]   at 

[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-03 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r148731634
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/SelectedFieldSuite.scala
 ---
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.planning
+
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.exceptions.TestFailedException
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.NamedExpression
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.types._
+
+// scalastyle:off line.size.limit
+class SelectedFieldSuite extends SparkFunSuite with BeforeAndAfterAll {
+  // The test schema as a tree string, i.e. `schema.treeString`
+  // root
+  //  |-- col1: string (nullable = false)
+  //  |-- col2: struct (nullable = true)
+  //  ||-- field1: integer (nullable = true)
+  //  ||-- field2: array (nullable = true)
+  //  |||-- element: integer (containsNull = false)
+  //  ||-- field3: array (nullable = false)
+  //  |||-- element: struct (containsNull = true)
+  //  ||||-- subfield1: integer (nullable = true)
+  //  ||||-- subfield2: integer (nullable = true)
+  //  ||||-- subfield3: array (nullable = true)
+  //  |||||-- element: integer (containsNull = true)
+  //  ||-- field4: map (nullable = true)
+  //  |||-- key: string
+  //  |||-- value: struct (valueContainsNull = false)
+  //  ||||-- subfield1: integer (nullable = true)
+  //  ||||-- subfield2: array (nullable = true)
+  //  |||||-- element: integer (containsNull = false)
+  //  ||-- field5: array (nullable = false)
+  //  |||-- element: struct (containsNull = true)
+  //  ||||-- subfield1: struct (nullable = false)
+  //  |||||-- subsubfield1: integer (nullable = true)
+  //  |||||-- subsubfield2: integer (nullable = true)
+  //  ||||-- subfield2: struct (nullable = true)
+  //  |||||-- subsubfield1: struct (nullable = true)
+  //  ||||||-- subsubsubfield1: string (nullable = 
true)
+  //  |||||-- subsubfield2: integer (nullable = true)
+  //  ||-- field6: struct (nullable = true)
+  //  |||-- subfield1: string (nullable = false)
+  //  |||-- subfield2: string (nullable = true)
+  //  ||-- field7: struct (nullable = true)
+  //  |||-- subfield1: struct (nullable = true)
+  //  ||||-- subsubfield1: integer (nullable = true)
+  //  ||||-- subsubfield2: integer (nullable = true)
+  //  ||-- field8: map (nullable = true)
+  //  |||-- key: string
+  //  |||-- value: array (valueContainsNull = false)
+  //  ||||-- element: struct (containsNull = true)
+  //  |||||-- subfield1: integer (nullable = true)
+  //  |||||-- subfield2: array (nullable = true)
+  //  ||||||-- element: integer (containsNull = false)
+  //  ||-- field9: map (nullable = true)
+  //  |||-- key: string
+  //  |||-- value: integer (valueContainsNull = false)
+  //  |-- col3: array (nullable = false)
+  //  ||-- element: struct (containsNull = false)
+  //  |||-- field1: struct (nullable = true)
+  //  ||||-- subfield1: integer (nullable = false)
+  //  ||||-- subfield2: integer (nullable = true)
+  //  |||-- field2: map (nullable = true)
+  //  ||||-- key: string
+  //  ||||-- value: integer 

[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-03 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r148731266
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/SelectedFieldSuite.scala
 ---
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.planning
+
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.exceptions.TestFailedException
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.NamedExpression
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.types._
+
+// scalastyle:off line.size.limit
+class SelectedFieldSuite extends SparkFunSuite with BeforeAndAfterAll {
+  // The test schema as a tree string, i.e. `schema.treeString`
+  // root
+  //  |-- col1: string (nullable = false)
+  //  |-- col2: struct (nullable = true)
+  //  ||-- field1: integer (nullable = true)
+  //  ||-- field2: array (nullable = true)
+  //  |||-- element: integer (containsNull = false)
+  //  ||-- field3: array (nullable = false)
+  //  |||-- element: struct (containsNull = true)
+  //  ||||-- subfield1: integer (nullable = true)
+  //  ||||-- subfield2: integer (nullable = true)
+  //  ||||-- subfield3: array (nullable = true)
+  //  |||||-- element: integer (containsNull = true)
+  //  ||-- field4: map (nullable = true)
+  //  |||-- key: string
+  //  |||-- value: struct (valueContainsNull = false)
+  //  ||||-- subfield1: integer (nullable = true)
+  //  ||||-- subfield2: array (nullable = true)
+  //  |||||-- element: integer (containsNull = false)
+  //  ||-- field5: array (nullable = false)
+  //  |||-- element: struct (containsNull = true)
+  //  ||||-- subfield1: struct (nullable = false)
+  //  |||||-- subsubfield1: integer (nullable = true)
+  //  |||||-- subsubfield2: integer (nullable = true)
+  //  ||||-- subfield2: struct (nullable = true)
+  //  |||||-- subsubfield1: struct (nullable = true)
+  //  ||||||-- subsubsubfield1: string (nullable = 
true)
+  //  |||||-- subsubfield2: integer (nullable = true)
+  //  ||-- field6: struct (nullable = true)
+  //  |||-- subfield1: string (nullable = false)
+  //  |||-- subfield2: string (nullable = true)
+  //  ||-- field7: struct (nullable = true)
+  //  |||-- subfield1: struct (nullable = true)
+  //  ||||-- subsubfield1: integer (nullable = true)
+  //  ||||-- subsubfield2: integer (nullable = true)
+  //  ||-- field8: map (nullable = true)
+  //  |||-- key: string
+  //  |||-- value: array (valueContainsNull = false)
+  //  ||||-- element: struct (containsNull = true)
+  //  |||||-- subfield1: integer (nullable = true)
+  //  |||||-- subfield2: array (nullable = true)
+  //  ||||||-- element: integer (containsNull = false)
+  //  ||-- field9: map (nullable = true)
+  //  |||-- key: string
+  //  |||-- value: integer (valueContainsNull = false)
+  //  |-- col3: array (nullable = false)
+  //  ||-- element: struct (containsNull = false)
+  //  |||-- field1: struct (nullable = true)
+  //  ||||-- subfield1: integer (nullable = false)
+  //  ||||-- subfield2: integer (nullable = true)
+  //  |||-- field2: map (nullable = true)
+  //  ||||-- key: string
+  //  ||||-- value: integer 

[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-03 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r148725914
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
 ---
@@ -63,9 +74,22 @@ private[parquet] class ParquetReadSupport extends 
ReadSupport[UnsafeRow] with Lo
   StructType.fromString(schemaString)
 }
 
-val parquetRequestedSchema =
+val clippedParquetSchema =
   ParquetReadSupport.clipParquetSchema(context.getFileSchema, 
catalystRequestedSchema)
 
+val parquetRequestedSchema = if (parquetMrCompatibility) {
+  // Parquet-mr will throw an exception if we try to read a superset 
of the file's schema.
+  // Therefore, we intersect our clipped schema with the underlying 
file's schema
--- End diff --

We can request a read of a superset of a file's fields for the case of a 
partitioned table with partitions with a subset of the table's fields. See my 
related comment and example in `ParquetRowConverter.scala`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-03 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r148725482
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/FileSchemaPruningTest.scala
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.scalactic.Equality
+import org.scalatest.Assertions
+
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.types.StructType
+
+private[sql] trait FileSchemaPruningTest {
+  _: Assertions =>
+
+  private val schemaEquality = new Equality[StructType] {
+override def areEqual(a: StructType, b: Any) =
+  b match {
+case otherType: StructType => a sameType otherType
+case _ => false
+  }
+  }
+
+  protected def checkScanSchemata(df: DataFrame, 
expectedSchemaCatalogStrings: String*): Unit = {
+val fileSourceScanSchemata =
--- End diff --

`fileSourceScanSchemata` is a `Seq[StructType]`, so I made it plural.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-03 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r148725325
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/FileSchemaPruningTest.scala
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.scalactic.Equality
+import org.scalatest.Assertions
+
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.types.StructType
+
+private[sql] trait FileSchemaPruningTest {
+  _: Assertions =>
+
+  private val schemaEquality = new Equality[StructType] {
+override def areEqual(a: StructType, b: Any) =
+  b match {
+case otherType: StructType => a sameType otherType
+case _ => false
+  }
+  }
+
+  protected def checkScanSchemata(df: DataFrame, 
expectedSchemaCatalogStrings: String*): Unit = {
--- End diff --

I used the plural form here because `expectedSchemaCatalogStrings` is a 
varargs type.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-03 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r148725152
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
 ---
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, 
ProjectionOverSchema, SelectedField}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.types.{ArrayType, DataType, MapType, 
StructField, StructType}
+
+/**
+ * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a
+ * [[ParquetRelation]]. By "Parquet column", we mean a column as defined 
in the
+ * Parquet format. In Spark SQL, a root-level Parquet column corresponds 
to a
+ * SQL column, and a nested Parquet column corresponds to a 
[[StructField]].
+ */
+private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+plan transformDown {
+  case op @ PhysicalOperation(projects, filters,
+  l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, 
partitionSchema,
+dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _, _)) 
=>
+val projectionFields = projects.flatMap(getFields)
+val filterFields = filters.flatMap(getFields)
+val requestedFields = (projectionFields ++ filterFields).distinct
+
+// If [[requestedFields]] includes a nested field, continue. 
Otherwise,
+// return [[op]]
+if (requestedFields.exists { case (_, optAtt) => optAtt.isEmpty }) 
{
+  val prunedSchema = requestedFields
+.map { case (field, _) => StructType(Array(field)) }
+.reduceLeft(_ merge _)
+  val dataSchemaFieldNames = dataSchema.fieldNames.toSet
+  val prunedDataSchema =
+StructType(prunedSchema.filter(f => 
dataSchemaFieldNames.contains(f.name)))
+
+  // If the data schema is different from the pruned data schema, 
continue. Otherwise,
+  // return [[op]]. We effect this comparison by counting the 
number of "leaf" fields in
+  // each schemata, assuming the fields in [[prunedDataSchema]] 
are a subset of the fields
+  // in [[dataSchema]].
+  if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) {
+val prunedParquetRelation =
+  hadoopFsRelation.copy(dataSchema = 
prunedDataSchema)(hadoopFsRelation.sparkSession)
+
+// We need to replace the expression ids of the pruned 
relation output attributes
+// with the expression ids of the original relation output 
attributes so that
+// references to the original relation's output are not broken
+val outputIdMap = l.output.map(att => (att.name, 
att.exprId)).toMap
+val prunedRelationOutput =
--- End diff --

Top-level attributes of struct type whose type has been pruned need to be 
replaced in the logical relation's output. The pruned attributes are 
constructed in the `toAttributes` method call on the pruned schema. The 
expression ids of these replacement attributes are altered to the expression 
ids of the original attributes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-03 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r148723190
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
 ---
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, 
ProjectionOverSchema, SelectedField}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.types.{ArrayType, DataType, MapType, 
StructField, StructType}
+
+/**
+ * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a
+ * [[ParquetRelation]]. By "Parquet column", we mean a column as defined 
in the
+ * Parquet format. In Spark SQL, a root-level Parquet column corresponds 
to a
+ * SQL column, and a nested Parquet column corresponds to a 
[[StructField]].
+ */
+private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+plan transformDown {
+  case op @ PhysicalOperation(projects, filters,
+  l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, 
partitionSchema,
+dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _, _)) 
=>
+val projectionFields = projects.flatMap(getFields)
+val filterFields = filters.flatMap(getFields)
+val requestedFields = (projectionFields ++ filterFields).distinct
+
+// If [[requestedFields]] includes a nested field, continue. 
Otherwise,
+// return [[op]]
+if (requestedFields.exists { case (_, optAtt) => optAtt.isEmpty }) 
{
+  val prunedSchema = requestedFields
+.map { case (field, _) => StructType(Array(field)) }
+.reduceLeft(_ merge _)
+  val dataSchemaFieldNames = dataSchema.fieldNames.toSet
+  val prunedDataSchema =
+StructType(prunedSchema.filter(f => 
dataSchemaFieldNames.contains(f.name)))
+
+  // If the data schema is different from the pruned data schema, 
continue. Otherwise,
+  // return [[op]]. We effect this comparison by counting the 
number of "leaf" fields in
+  // each schemata, assuming the fields in [[prunedDataSchema]] 
are a subset of the fields
+  // in [[dataSchema]].
+  if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) {
+val prunedParquetRelation =
+  hadoopFsRelation.copy(dataSchema = 
prunedDataSchema)(hadoopFsRelation.sparkSession)
+
+// We need to replace the expression ids of the pruned 
relation output attributes
+// with the expression ids of the original relation output 
attributes so that
+// references to the original relation's output are not broken
+val outputIdMap = l.output.map(att => (att.name, 
att.exprId)).toMap
+val prunedRelationOutput =
+  prunedParquetRelation
+.schema
+.toAttributes
+.map {
+  case att if outputIdMap.contains(att.name) =>
+att.withExprId(outputIdMap(att.name))
+  case att => att
+}
+val prunedRelation =
+  l.copy(relation = prunedParquetRelation, output = 
prunedRelationOutput,
+catalogTable = None)
--- End diff --

I just made a judgment call here based on the idea that by modifying the 
logical relation it's no longer the same as the catalog table. What do you 
think?


---


[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-03 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r148722822
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
 ---
@@ -127,8 +127,8 @@ private[parquet] class ParquetRowConverter(
   extends ParquetGroupConverter(updater) with Logging {
 
   assert(
-parquetType.getFieldCount == catalystType.length,
-s"""Field counts of the Parquet schema and the Catalyst schema don't 
match:
+parquetType.getFieldCount <= catalystType.length,
--- End diff --

In `ParquetReadSupport.scala`, when `parquetMrCompatibility` is `true`, we 
intersect the clipped parquet schema with the underlying parquet file's schema. 
This can result in a requested parquet schema with fewer fields than the 
requested catalyst schema.

For example, in the case of a partitioned table where we select a column 
which doesn't exist in the schema of one partition's files, we will remove the 
missing columns from the requested parquet schema.

This scenario is illustrated and tested by the "partial schema intersection 
- select missing subfield" test in `ParquetSchemaPruningSuite.scala`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r148720677
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/AggregateFieldExtractionPushdown.scala
 ---
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, 
LogicalPlan, Project}
+
+/**
+ * Pushes down aliases to [[expressions.GetStructField]] expressions in an 
aggregate's grouping and
+ * aggregate expressions into a projection over its children. The original
+ * [[expressions.GetStructField]] expressions are replaced with references 
to the pushed down
+ * aliases.
+ */
+object AggregateFieldExtractionPushdown extends FieldExtractionPushdown {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+plan transformDown {
+  case agg @ Aggregate(groupingExpressions, aggregateExpressions, 
child) =>
+val expressions = groupingExpressions ++ aggregateExpressions
+val attributes = AttributeSet(expressions.collect { case att: 
Attribute => att })
+val childAttributes = AttributeSet(child.expressions)
+val fieldExtractors0 =
+  expressions
+.flatMap(getFieldExtractors)
+.distinct
+val fieldExtractors1 =
+  fieldExtractors0
+.filter(_.collectFirst { case att: Attribute => att }
+  .filter(attributes.contains).isEmpty)
+val fieldExtractors =
+  fieldExtractors1
+.filter(_.collectFirst { case att: Attribute => att }
+  .filter(childAttributes.contains).nonEmpty)
+
+if (fieldExtractors.nonEmpty) {
+  val (aliases, substituteAttributes) = 
constructAliasesAndSubstitutions(fieldExtractors)
+
+  // Construct the new grouping and aggregate expressions by 
substituting
+  // each GetStructField expression with a reference to its alias
+  val newAggregateExpressions =
+aggregateExpressions.map(substituteAttributes)
+  .collect { case named: NamedExpression => named }
--- End diff --

I meant it might be like: 
`aggregateExpressions.map(substituteAttributes).asInstanceOf[Seq[NamedExpression]]`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r148720590
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/FieldExtractionPushdown.scala
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, 
GetStructField}
+import org.apache.spark.sql.catalyst.planning.SelectedField
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+
+abstract class FieldExtractionPushdown extends Rule[LogicalPlan] {
--- End diff --

Oh. I just thought `FieldExtractionPushdown` seems not to be necessarily a 
`Rule` and being just a simple trait. `AggregateFieldExtractionPushdown` and 
`JoinFieldExtractionPushdown` extends `Rule` and `FieldExtractionPushdown` 
then. Not a big deal.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-03 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r148719962
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/ProjectionOverSchema.scala
 ---
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.planning
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types._
+
+/**
+ * A Scala extractor that projects an expression over a given schema. Data 
types,
+ * field indexes and array lengths of complex type extractors and 
attributes
+ * are adjusted to fit the schema. All other expressions are left as-is.
+ */
+case class ProjectionOverSchema(schema: StructType) {
--- End diff --

> Can you describe clearly in comment that this is used in nested column 
pruning?

Will do.

> ProjectionOverPrunedSchema ?

It's a thought. Technically this supports projection over any schema, 
pruned or otherwise. But its reason for existence is schema pruning.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-03 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r148718059
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/ProjectionOverSchema.scala
 ---
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.planning
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types._
+
+/**
+ * A Scala extractor that projects an expression over a given schema. Data 
types,
+ * field indexes and array lengths of complex type extractors and 
attributes
--- End diff --

I don't remember why I used the term "array lengths" here, but I believe it 
pertains to the `numFields` field of the `GetArrayStructFields` case class. 
Projecting over an array of structs may filter out one or more fields in that 
struct. In that case, `numFields` needs to be adjusted.

I'll revise the doc comment.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-03 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r148717122
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/JoinFieldExtractionPushdown.scala
 ---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
NamedExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, 
Project}
+
+/**
+ * Pushes down aliases to [[expressions.GetStructField]] expressions in a 
projection over a join
+ * and its join condition. The original [[expressions.GetStructField]] 
expressions are replaced
+ * with references to the pushed down aliases.
+ */
+object JoinFieldExtractionPushdown extends FieldExtractionPushdown {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+plan transformDown {
+  case op @ PhysicalOperation(projects, Seq(),
+  join @ Join(left, right, joinType, Some(joinCondition))) =>
+val fieldExtractors = (projects :+ 
joinCondition).flatMap(getFieldExtractors).distinct
+
+if (fieldExtractors.nonEmpty) {
+  val (aliases, substituteAttributes) = 
constructAliasesAndSubstitutions(fieldExtractors)
+
+  // Construct the new projections and join condition by 
substituting each GetStructField
+  // expression with a reference to its alias
+  val newProjects =
+projects.map(substituteAttributes).collect { case named: 
NamedExpression => named }
--- End diff --

Please see my reply to the same question for 
`AggregateFieldExtractionPushdown`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-03 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r148717085
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/FieldExtractionPushdown.scala
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, 
GetStructField}
+import org.apache.spark.sql.catalyst.planning.SelectedField
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+
+abstract class FieldExtractionPushdown extends Rule[LogicalPlan] {
--- End diff --

Not sure what you're getting at, but this seems reasonable to me. 
`FieldExtractionPushdown` is part of an abstract class hierarchy that supports 
`AggregateFieldExtractionPushdown` and `JoinFieldExtractionPushdown`. The 
latter two must be instances of `Rule[LogicalPlan]` to be plugged into the 
optimizer.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-03 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r148716702
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/AggregateFieldExtractionPushdown.scala
 ---
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, 
LogicalPlan, Project}
+
+/**
+ * Pushes down aliases to [[expressions.GetStructField]] expressions in an 
aggregate's grouping and
+ * aggregate expressions into a projection over its children. The original
+ * [[expressions.GetStructField]] expressions are replaced with references 
to the pushed down
+ * aliases.
+ */
+object AggregateFieldExtractionPushdown extends FieldExtractionPushdown {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+plan transformDown {
+  case agg @ Aggregate(groupingExpressions, aggregateExpressions, 
child) =>
+val expressions = groupingExpressions ++ aggregateExpressions
+val attributes = AttributeSet(expressions.collect { case att: 
Attribute => att })
+val childAttributes = AttributeSet(child.expressions)
+val fieldExtractors0 =
+  expressions
+.flatMap(getFieldExtractors)
+.distinct
+val fieldExtractors1 =
+  fieldExtractors0
+.filter(_.collectFirst { case att: Attribute => att }
+  .filter(attributes.contains).isEmpty)
+val fieldExtractors =
+  fieldExtractors1
+.filter(_.collectFirst { case att: Attribute => att }
+  .filter(childAttributes.contains).nonEmpty)
+
+if (fieldExtractors.nonEmpty) {
+  val (aliases, substituteAttributes) = 
constructAliasesAndSubstitutions(fieldExtractors)
+
+  // Construct the new grouping and aggregate expressions by 
substituting
+  // each GetStructField expression with a reference to its alias
+  val newAggregateExpressions =
+aggregateExpressions.map(substituteAttributes)
+  .collect { case named: NamedExpression => named }
--- End diff --

I'm not sure exactly what you're asking, but the compiler infers that 
`newAggregateExpressions` is `Seq[NamedExpression]` because of the type 
signature of `{ case named: NamedExpression => named }`. We don't need any kind 
of type casting here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-03 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r148716194
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/AggregateFieldExtractionPushdown.scala
 ---
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, 
LogicalPlan, Project}
+
+/**
+ * Pushes down aliases to [[expressions.GetStructField]] expressions in an 
aggregate's grouping and
+ * aggregate expressions into a projection over its children. The original
+ * [[expressions.GetStructField]] expressions are replaced with references 
to the pushed down
+ * aliases.
+ */
+object AggregateFieldExtractionPushdown extends FieldExtractionPushdown {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+plan transformDown {
+  case agg @ Aggregate(groupingExpressions, aggregateExpressions, 
child) =>
+val expressions = groupingExpressions ++ aggregateExpressions
+val attributes = AttributeSet(expressions.collect { case att: 
Attribute => att })
+val childAttributes = AttributeSet(child.expressions)
+val fieldExtractors0 =
+  expressions
+.flatMap(getFieldExtractors)
+.distinct
+val fieldExtractors1 =
+  fieldExtractors0
+.filter(_.collectFirst { case att: Attribute => att }
+  .filter(attributes.contains).isEmpty)
--- End diff --

The attribute `a` is not in `expressions`, so it is not in `attributes`. 
When we construct `attributes`, we simply collect instances of `Attribute`. We 
don't do any recursion.

Your query is tested by the "basic aggregate field extraction pushdown" 
test in `AggregateFieldExtractionPushdownSuite`. It's a little difficult to see 
because I'm using the Catalyst DataFrame DSL. This seems to be the convention 
in these tests, though.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-02 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r148687395
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/FieldExtractionPushdown.scala
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, 
GetStructField}
+import org.apache.spark.sql.catalyst.planning.SelectedField
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+
+abstract class FieldExtractionPushdown extends Rule[LogicalPlan] {
--- End diff --

> nit: Does this need to be a Rule?

As opposed to?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-02 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r148456230
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/FileSchemaPruningTest.scala
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.scalactic.Equality
+import org.scalatest.Assertions
+
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.types.StructType
+
+private[sql] trait FileSchemaPruningTest {
+  _: Assertions =>
+
+  private val schemaEquality = new Equality[StructType] {
+override def areEqual(a: StructType, b: Any) =
+  b match {
+case otherType: StructType => a sameType otherType
+case _ => false
+  }
+  }
+
+  protected def checkScanSchemata(df: DataFrame, 
expectedSchemaCatalogStrings: String*): Unit = {
+val fileSourceScanSchemata =
--- End diff --

`fileSourceScanSchemata` -> `fileSourceScanSchema`? and also below.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-02 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r148455168
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/SelectedFieldSuite.scala
 ---
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.planning
+
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.exceptions.TestFailedException
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.NamedExpression
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.types._
+
+// scalastyle:off line.size.limit
+class SelectedFieldSuite extends SparkFunSuite with BeforeAndAfterAll {
+  // The test schema as a tree string, i.e. `schema.treeString`
+  // root
+  //  |-- col1: string (nullable = false)
+  //  |-- col2: struct (nullable = true)
+  //  ||-- field1: integer (nullable = true)
+  //  ||-- field2: array (nullable = true)
+  //  |||-- element: integer (containsNull = false)
+  //  ||-- field3: array (nullable = false)
+  //  |||-- element: struct (containsNull = true)
+  //  ||||-- subfield1: integer (nullable = true)
+  //  ||||-- subfield2: integer (nullable = true)
+  //  ||||-- subfield3: array (nullable = true)
+  //  |||||-- element: integer (containsNull = true)
+  //  ||-- field4: map (nullable = true)
+  //  |||-- key: string
+  //  |||-- value: struct (valueContainsNull = false)
+  //  ||||-- subfield1: integer (nullable = true)
+  //  ||||-- subfield2: array (nullable = true)
+  //  |||||-- element: integer (containsNull = false)
+  //  ||-- field5: array (nullable = false)
+  //  |||-- element: struct (containsNull = true)
+  //  ||||-- subfield1: struct (nullable = false)
+  //  |||||-- subsubfield1: integer (nullable = true)
+  //  |||||-- subsubfield2: integer (nullable = true)
+  //  ||||-- subfield2: struct (nullable = true)
+  //  |||||-- subsubfield1: struct (nullable = true)
+  //  ||||||-- subsubsubfield1: string (nullable = 
true)
+  //  |||||-- subsubfield2: integer (nullable = true)
+  //  ||-- field6: struct (nullable = true)
+  //  |||-- subfield1: string (nullable = false)
+  //  |||-- subfield2: string (nullable = true)
+  //  ||-- field7: struct (nullable = true)
+  //  |||-- subfield1: struct (nullable = true)
+  //  ||||-- subsubfield1: integer (nullable = true)
+  //  ||||-- subsubfield2: integer (nullable = true)
+  //  ||-- field8: map (nullable = true)
+  //  |||-- key: string
+  //  |||-- value: array (valueContainsNull = false)
+  //  ||||-- element: struct (containsNull = true)
+  //  |||||-- subfield1: integer (nullable = true)
+  //  |||||-- subfield2: array (nullable = true)
+  //  ||||||-- element: integer (containsNull = false)
+  //  ||-- field9: map (nullable = true)
+  //  |||-- key: string
+  //  |||-- value: integer (valueContainsNull = false)
+  //  |-- col3: array (nullable = false)
+  //  ||-- element: struct (containsNull = false)
+  //  |||-- field1: struct (nullable = true)
+  //  ||||-- subfield1: integer (nullable = false)
+  //  ||||-- subfield2: integer (nullable = true)
+  //  |||-- field2: map (nullable = true)
+  //  ||||-- key: string
+  //  ||||-- value: integer 

[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-02 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r148454584
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/SelectedFieldSuite.scala
 ---
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.planning
+
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.exceptions.TestFailedException
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.NamedExpression
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.types._
+
+// scalastyle:off line.size.limit
+class SelectedFieldSuite extends SparkFunSuite with BeforeAndAfterAll {
+  // The test schema as a tree string, i.e. `schema.treeString`
+  // root
+  //  |-- col1: string (nullable = false)
+  //  |-- col2: struct (nullable = true)
+  //  ||-- field1: integer (nullable = true)
+  //  ||-- field2: array (nullable = true)
+  //  |||-- element: integer (containsNull = false)
+  //  ||-- field3: array (nullable = false)
+  //  |||-- element: struct (containsNull = true)
+  //  ||||-- subfield1: integer (nullable = true)
+  //  ||||-- subfield2: integer (nullable = true)
+  //  ||||-- subfield3: array (nullable = true)
+  //  |||||-- element: integer (containsNull = true)
+  //  ||-- field4: map (nullable = true)
+  //  |||-- key: string
+  //  |||-- value: struct (valueContainsNull = false)
+  //  ||||-- subfield1: integer (nullable = true)
+  //  ||||-- subfield2: array (nullable = true)
+  //  |||||-- element: integer (containsNull = false)
+  //  ||-- field5: array (nullable = false)
+  //  |||-- element: struct (containsNull = true)
+  //  ||||-- subfield1: struct (nullable = false)
+  //  |||||-- subsubfield1: integer (nullable = true)
+  //  |||||-- subsubfield2: integer (nullable = true)
+  //  ||||-- subfield2: struct (nullable = true)
+  //  |||||-- subsubfield1: struct (nullable = true)
+  //  ||||||-- subsubsubfield1: string (nullable = 
true)
+  //  |||||-- subsubfield2: integer (nullable = true)
+  //  ||-- field6: struct (nullable = true)
+  //  |||-- subfield1: string (nullable = false)
+  //  |||-- subfield2: string (nullable = true)
+  //  ||-- field7: struct (nullable = true)
+  //  |||-- subfield1: struct (nullable = true)
+  //  ||||-- subsubfield1: integer (nullable = true)
+  //  ||||-- subsubfield2: integer (nullable = true)
+  //  ||-- field8: map (nullable = true)
+  //  |||-- key: string
+  //  |||-- value: array (valueContainsNull = false)
+  //  ||||-- element: struct (containsNull = true)
+  //  |||||-- subfield1: integer (nullable = true)
+  //  |||||-- subfield2: array (nullable = true)
+  //  ||||||-- element: integer (containsNull = false)
+  //  ||-- field9: map (nullable = true)
+  //  |||-- key: string
+  //  |||-- value: integer (valueContainsNull = false)
+  //  |-- col3: array (nullable = false)
+  //  ||-- element: struct (containsNull = false)
+  //  |||-- field1: struct (nullable = true)
+  //  ||||-- subfield1: integer (nullable = false)
+  //  ||||-- subfield2: integer (nullable = true)
+  //  |||-- field2: map (nullable = true)
+  //  ||||-- key: string
+  //  ||||-- value: integer 

[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-02 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r148455060
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/SelectedFieldSuite.scala
 ---
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.planning
+
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.exceptions.TestFailedException
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.NamedExpression
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.types._
+
+// scalastyle:off line.size.limit
+class SelectedFieldSuite extends SparkFunSuite with BeforeAndAfterAll {
+  // The test schema as a tree string, i.e. `schema.treeString`
+  // root
+  //  |-- col1: string (nullable = false)
+  //  |-- col2: struct (nullable = true)
+  //  ||-- field1: integer (nullable = true)
+  //  ||-- field2: array (nullable = true)
+  //  |||-- element: integer (containsNull = false)
+  //  ||-- field3: array (nullable = false)
+  //  |||-- element: struct (containsNull = true)
+  //  ||||-- subfield1: integer (nullable = true)
+  //  ||||-- subfield2: integer (nullable = true)
+  //  ||||-- subfield3: array (nullable = true)
+  //  |||||-- element: integer (containsNull = true)
+  //  ||-- field4: map (nullable = true)
+  //  |||-- key: string
+  //  |||-- value: struct (valueContainsNull = false)
+  //  ||||-- subfield1: integer (nullable = true)
+  //  ||||-- subfield2: array (nullable = true)
+  //  |||||-- element: integer (containsNull = false)
+  //  ||-- field5: array (nullable = false)
+  //  |||-- element: struct (containsNull = true)
+  //  ||||-- subfield1: struct (nullable = false)
+  //  |||||-- subsubfield1: integer (nullable = true)
+  //  |||||-- subsubfield2: integer (nullable = true)
+  //  ||||-- subfield2: struct (nullable = true)
+  //  |||||-- subsubfield1: struct (nullable = true)
+  //  ||||||-- subsubsubfield1: string (nullable = 
true)
+  //  |||||-- subsubfield2: integer (nullable = true)
+  //  ||-- field6: struct (nullable = true)
+  //  |||-- subfield1: string (nullable = false)
+  //  |||-- subfield2: string (nullable = true)
+  //  ||-- field7: struct (nullable = true)
+  //  |||-- subfield1: struct (nullable = true)
+  //  ||||-- subsubfield1: integer (nullable = true)
+  //  ||||-- subsubfield2: integer (nullable = true)
+  //  ||-- field8: map (nullable = true)
+  //  |||-- key: string
+  //  |||-- value: array (valueContainsNull = false)
+  //  ||||-- element: struct (containsNull = true)
+  //  |||||-- subfield1: integer (nullable = true)
+  //  |||||-- subfield2: array (nullable = true)
+  //  ||||||-- element: integer (containsNull = false)
+  //  ||-- field9: map (nullable = true)
+  //  |||-- key: string
+  //  |||-- value: integer (valueContainsNull = false)
+  //  |-- col3: array (nullable = false)
+  //  ||-- element: struct (containsNull = false)
+  //  |||-- field1: struct (nullable = true)
+  //  ||||-- subfield1: integer (nullable = false)
+  //  ||||-- subfield2: integer (nullable = true)
+  //  |||-- field2: map (nullable = true)
+  //  ||||-- key: string
+  //  ||||-- value: integer 

[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-02 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r148452868
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
 ---
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, 
ProjectionOverSchema, SelectedField}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.types.{ArrayType, DataType, MapType, 
StructField, StructType}
+
+/**
+ * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a
+ * [[ParquetRelation]]. By "Parquet column", we mean a column as defined 
in the
+ * Parquet format. In Spark SQL, a root-level Parquet column corresponds 
to a
+ * SQL column, and a nested Parquet column corresponds to a 
[[StructField]].
+ */
+private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+plan transformDown {
+  case op @ PhysicalOperation(projects, filters,
+  l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, 
partitionSchema,
+dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _, _)) 
=>
+val projectionFields = projects.flatMap(getFields)
+val filterFields = filters.flatMap(getFields)
+val requestedFields = (projectionFields ++ filterFields).distinct
+
+// If [[requestedFields]] includes a nested field, continue. 
Otherwise,
+// return [[op]]
+if (requestedFields.exists { case (_, optAtt) => optAtt.isEmpty }) 
{
+  val prunedSchema = requestedFields
+.map { case (field, _) => StructType(Array(field)) }
+.reduceLeft(_ merge _)
+  val dataSchemaFieldNames = dataSchema.fieldNames.toSet
+  val prunedDataSchema =
+StructType(prunedSchema.filter(f => 
dataSchemaFieldNames.contains(f.name)))
+
+  // If the data schema is different from the pruned data schema, 
continue. Otherwise,
+  // return [[op]]. We effect this comparison by counting the 
number of "leaf" fields in
+  // each schemata, assuming the fields in [[prunedDataSchema]] 
are a subset of the fields
+  // in [[dataSchema]].
+  if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) {
+val prunedParquetRelation =
+  hadoopFsRelation.copy(dataSchema = 
prunedDataSchema)(hadoopFsRelation.sparkSession)
+
+// We need to replace the expression ids of the pruned 
relation output attributes
+// with the expression ids of the original relation output 
attributes so that
+// references to the original relation's output are not broken
+val outputIdMap = l.output.map(att => (att.name, 
att.exprId)).toMap
+val prunedRelationOutput =
--- End diff --

Will we change the top-level attributes during pruning and make a different 
output for the logical relation? I think only the data types of the nested 
columns are changed?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-02 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r148450744
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
 ---
@@ -127,8 +127,8 @@ private[parquet] class ParquetRowConverter(
   extends ParquetGroupConverter(updater) with Logging {
 
   assert(
-parquetType.getFieldCount == catalystType.length,
-s"""Field counts of the Parquet schema and the Catalyst schema don't 
match:
+parquetType.getFieldCount <= catalystType.length,
--- End diff --

Why it can be less than catalyst type length now?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-02 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r148450632
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
 ---
@@ -63,9 +74,22 @@ private[parquet] class ParquetReadSupport extends 
ReadSupport[UnsafeRow] with Lo
   StructType.fromString(schemaString)
 }
 
-val parquetRequestedSchema =
+val clippedParquetSchema =
   ParquetReadSupport.clipParquetSchema(context.getFileSchema, 
catalystRequestedSchema)
 
+val parquetRequestedSchema = if (parquetMrCompatibility) {
+  // Parquet-mr will throw an exception if we try to read a superset 
of the file's schema.
+  // Therefore, we intersect our clipped schema with the underlying 
file's schema
--- End diff --

Can you give an example it would fail? We didn't change 
`clipParquetSchema`, so I think even when pruning happens, why we read a super 
set of the file's schema and cause the exception, according to the comment? We 
won't add new fields but remove existing from the file's schema, right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-02 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r148456140
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/FileSchemaPruningTest.scala
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.scalactic.Equality
+import org.scalatest.Assertions
+
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.types.StructType
+
+private[sql] trait FileSchemaPruningTest {
+  _: Assertions =>
+
+  private val schemaEquality = new Equality[StructType] {
+override def areEqual(a: StructType, b: Any) =
+  b match {
+case otherType: StructType => a sameType otherType
+case _ => false
+  }
+  }
+
+  protected def checkScanSchemata(df: DataFrame, 
expectedSchemaCatalogStrings: String*): Unit = {
--- End diff --

`checkScanSchemata` -> `checkScanSchema`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-02 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r148452275
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
 ---
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, 
ProjectionOverSchema, SelectedField}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.types.{ArrayType, DataType, MapType, 
StructField, StructType}
+
+/**
+ * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a
+ * [[ParquetRelation]]. By "Parquet column", we mean a column as defined 
in the
+ * Parquet format. In Spark SQL, a root-level Parquet column corresponds 
to a
+ * SQL column, and a nested Parquet column corresponds to a 
[[StructField]].
+ */
+private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+plan transformDown {
+  case op @ PhysicalOperation(projects, filters,
+  l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, 
partitionSchema,
+dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _, _)) 
=>
+val projectionFields = projects.flatMap(getFields)
+val filterFields = filters.flatMap(getFields)
+val requestedFields = (projectionFields ++ filterFields).distinct
+
+// If [[requestedFields]] includes a nested field, continue. 
Otherwise,
+// return [[op]]
+if (requestedFields.exists { case (_, optAtt) => optAtt.isEmpty }) 
{
+  val prunedSchema = requestedFields
+.map { case (field, _) => StructType(Array(field)) }
+.reduceLeft(_ merge _)
+  val dataSchemaFieldNames = dataSchema.fieldNames.toSet
+  val prunedDataSchema =
+StructType(prunedSchema.filter(f => 
dataSchemaFieldNames.contains(f.name)))
+
+  // If the data schema is different from the pruned data schema, 
continue. Otherwise,
+  // return [[op]]. We effect this comparison by counting the 
number of "leaf" fields in
+  // each schemata, assuming the fields in [[prunedDataSchema]] 
are a subset of the fields
+  // in [[dataSchema]].
+  if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) {
+val prunedParquetRelation =
+  hadoopFsRelation.copy(dataSchema = 
prunedDataSchema)(hadoopFsRelation.sparkSession)
+
+// We need to replace the expression ids of the pruned 
relation output attributes
+// with the expression ids of the original relation output 
attributes so that
+// references to the original relation's output are not broken
+val outputIdMap = l.output.map(att => (att.name, 
att.exprId)).toMap
+val prunedRelationOutput =
+  prunedParquetRelation
+.schema
+.toAttributes
+.map {
+  case att if outputIdMap.contains(att.name) =>
+att.withExprId(outputIdMap(att.name))
+  case att => att
+}
+val prunedRelation =
+  l.copy(relation = prunedParquetRelation, output = 
prunedRelationOutput,
+catalogTable = None)
--- End diff --

Why remove original `catalogTable` if any?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-02 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r148437575
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/AggregateFieldExtractionPushdown.scala
 ---
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, 
LogicalPlan, Project}
+
+/**
+ * Pushes down aliases to [[expressions.GetStructField]] expressions in an 
aggregate's grouping and
+ * aggregate expressions into a projection over its children. The original
+ * [[expressions.GetStructField]] expressions are replaced with references 
to the pushed down
+ * aliases.
+ */
+object AggregateFieldExtractionPushdown extends FieldExtractionPushdown {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+plan transformDown {
+  case agg @ Aggregate(groupingExpressions, aggregateExpressions, 
child) =>
+val expressions = groupingExpressions ++ aggregateExpressions
+val attributes = AttributeSet(expressions.collect { case att: 
Attribute => att })
+val childAttributes = AttributeSet(child.expressions)
+val fieldExtractors0 =
+  expressions
+.flatMap(getFieldExtractors)
+.distinct
+val fieldExtractors1 =
+  fieldExtractors0
+.filter(_.collectFirst { case att: Attribute => att }
+  .filter(attributes.contains).isEmpty)
+val fieldExtractors =
+  fieldExtractors1
+.filter(_.collectFirst { case att: Attribute => att }
+  .filter(childAttributes.contains).nonEmpty)
+
+if (fieldExtractors.nonEmpty) {
+  val (aliases, substituteAttributes) = 
constructAliasesAndSubstitutions(fieldExtractors)
--- End diff --



We can return original plan if aliases is empty.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-02 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r148446149
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/JoinFieldExtractionPushdown.scala
 ---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
NamedExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, 
Project}
+
+/**
+ * Pushes down aliases to [[expressions.GetStructField]] expressions in a 
projection over a join
--- End diff --

Please describe this is specialized for pushing down pruned nested column.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-02 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r148446166
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/AggregateFieldExtractionPushdown.scala
 ---
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, 
LogicalPlan, Project}
+
+/**
+ * Pushes down aliases to [[expressions.GetStructField]] expressions in an 
aggregate's grouping and
--- End diff --

Please describe this is specialized for pushing down pruned nested column.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-02 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r148434529
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/ProjectionOverSchema.scala
 ---
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.planning
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types._
+
+/**
+ * A Scala extractor that projects an expression over a given schema. Data 
types,
+ * field indexes and array lengths of complex type extractors and 
attributes
+ * are adjusted to fit the schema. All other expressions are left as-is.
+ */
+case class ProjectionOverSchema(schema: StructType) {
--- End diff --

Can you describe clearly in comment that this is used in nested column 
pruning?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-02 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r148446321
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/FieldExtractionPushdown.scala
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, 
GetStructField}
+import org.apache.spark.sql.catalyst.planning.SelectedField
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+
+abstract class FieldExtractionPushdown extends Rule[LogicalPlan] {
--- End diff --

nit: Does this need to be a `Rule`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-02 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r148434648
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/ProjectionOverSchema.scala
 ---
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.planning
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types._
+
+/**
+ * A Scala extractor that projects an expression over a given schema. Data 
types,
+ * field indexes and array lengths of complex type extractors and 
attributes
+ * are adjusted to fit the schema. All other expressions are left as-is.
+ */
+case class ProjectionOverSchema(schema: StructType) {
--- End diff --

`ProjectionOverPrunedSchema`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-02 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r148433577
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/ProjectionOverSchema.scala
 ---
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.planning
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types._
+
+/**
+ * A Scala extractor that projects an expression over a given schema. Data 
types,
+ * field indexes and array lengths of complex type extractors and 
attributes
--- End diff --

Have we changed array lengths? I just saw adjusting to data type, field 
index.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-02 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r148446396
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/JoinFieldExtractionPushdown.scala
 ---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
NamedExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, 
Project}
+
+/**
+ * Pushes down aliases to [[expressions.GetStructField]] expressions in a 
projection over a join
+ * and its join condition. The original [[expressions.GetStructField]] 
expressions are replaced
+ * with references to the pushed down aliases.
+ */
+object JoinFieldExtractionPushdown extends FieldExtractionPushdown {
--- End diff --

Oh, I just meant we may skip most of the rule if it can't be applied. 
Please see my comment below.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-02 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r148437069
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/JoinFieldExtractionPushdown.scala
 ---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
NamedExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, 
Project}
+
+/**
+ * Pushes down aliases to [[expressions.GetStructField]] expressions in a 
projection over a join
+ * and its join condition. The original [[expressions.GetStructField]] 
expressions are replaced
+ * with references to the pushed down aliases.
+ */
+object JoinFieldExtractionPushdown extends FieldExtractionPushdown {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+plan transformDown {
+  case op @ PhysicalOperation(projects, Seq(),
+  join @ Join(left, right, joinType, Some(joinCondition))) =>
+val fieldExtractors = (projects :+ 
joinCondition).flatMap(getFieldExtractors).distinct
+
+if (fieldExtractors.nonEmpty) {
+  val (aliases, substituteAttributes) = 
constructAliasesAndSubstitutions(fieldExtractors)
+
+  // Construct the new projections and join condition by 
substituting each GetStructField
+  // expression with a reference to its alias
+  val newProjects =
+projects.map(substituteAttributes).collect { case named: 
NamedExpression => named }
--- End diff --

`asInstanceOf[Seq[NamedExpression]]`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-02 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r148436498
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/AggregateFieldExtractionPushdown.scala
 ---
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, 
LogicalPlan, Project}
+
+/**
+ * Pushes down aliases to [[expressions.GetStructField]] expressions in an 
aggregate's grouping and
+ * aggregate expressions into a projection over its children. The original
+ * [[expressions.GetStructField]] expressions are replaced with references 
to the pushed down
+ * aliases.
+ */
+object AggregateFieldExtractionPushdown extends FieldExtractionPushdown {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+plan transformDown {
+  case agg @ Aggregate(groupingExpressions, aggregateExpressions, 
child) =>
+val expressions = groupingExpressions ++ aggregateExpressions
+val attributes = AttributeSet(expressions.collect { case att: 
Attribute => att })
+val childAttributes = AttributeSet(child.expressions)
+val fieldExtractors0 =
+  expressions
+.flatMap(getFieldExtractors)
+.distinct
+val fieldExtractors1 =
+  fieldExtractors0
+.filter(_.collectFirst { case att: Attribute => att }
+  .filter(attributes.contains).isEmpty)
+val fieldExtractors =
+  fieldExtractors1
+.filter(_.collectFirst { case att: Attribute => att }
+  .filter(childAttributes.contains).nonEmpty)
+
+if (fieldExtractors.nonEmpty) {
+  val (aliases, substituteAttributes) = 
constructAliasesAndSubstitutions(fieldExtractors)
+
+  // Construct the new grouping and aggregate expressions by 
substituting
+  // each GetStructField expression with a reference to its alias
+  val newAggregateExpressions =
+aggregateExpressions.map(substituteAttributes)
+  .collect { case named: NamedExpression => named }
--- End diff --

`asInstanceOf[Seq[NamedExpression]]`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-02 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r148437549
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/JoinFieldExtractionPushdown.scala
 ---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
NamedExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, 
Project}
+
+/**
+ * Pushes down aliases to [[expressions.GetStructField]] expressions in a 
projection over a join
+ * and its join condition. The original [[expressions.GetStructField]] 
expressions are replaced
+ * with references to the pushed down aliases.
+ */
+object JoinFieldExtractionPushdown extends FieldExtractionPushdown {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+plan transformDown {
+  case op @ PhysicalOperation(projects, Seq(),
+  join @ Join(left, right, joinType, Some(joinCondition))) =>
+val fieldExtractors = (projects :+ 
joinCondition).flatMap(getFieldExtractors).distinct
+
+if (fieldExtractors.nonEmpty) {
+  val (aliases, substituteAttributes) = 
constructAliasesAndSubstitutions(fieldExtractors)
--- End diff --

We can return original plan if `aliases` is empty.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-02 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r148435955
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/AggregateFieldExtractionPushdown.scala
 ---
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, 
LogicalPlan, Project}
+
+/**
+ * Pushes down aliases to [[expressions.GetStructField]] expressions in an 
aggregate's grouping and
+ * aggregate expressions into a projection over its children. The original
+ * [[expressions.GetStructField]] expressions are replaced with references 
to the pushed down
+ * aliases.
+ */
+object AggregateFieldExtractionPushdown extends FieldExtractionPushdown {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+plan transformDown {
+  case agg @ Aggregate(groupingExpressions, aggregateExpressions, 
child) =>
+val expressions = groupingExpressions ++ aggregateExpressions
+val attributes = AttributeSet(expressions.collect { case att: 
Attribute => att })
+val childAttributes = AttributeSet(child.expressions)
+val fieldExtractors0 =
+  expressions
+.flatMap(getFieldExtractors)
+.distinct
+val fieldExtractors1 =
+  fieldExtractors0
+.filter(_.collectFirst { case att: Attribute => att }
+  .filter(attributes.contains).isEmpty)
--- End diff --

If the query is:

```sql
select a.b, count(1) from r1 group by a.b
```

`fieldExtractors0` gets `GetStructField` `a.b`. Won't `fieldExtractors1` 
filter it out, because the attribute `a` is contained in the attribute set of 
all expressions? But we don't need all fields of `a` now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-10-02 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r142140412
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
 ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, 
ProjectionOverSchema, SelectedField}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.types.{StructField, StructType}
+
+/**
+ * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a
+ * [[ParquetRelation]]. By "Parquet column", we mean a column as defined 
in the
+ * Parquet format. In Spark SQL, a root-level Parquet column corresponds 
to a
+ * SQL column, and a nested Parquet column corresponds to a 
[[StructField]].
+ */
+private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+plan transformDown {
+  case op @ PhysicalOperation(projects, filters,
+  l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, 
partitionSchema,
+dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _)) =>
+val projectionFields = projects.flatMap(getFields)
+val filterFields = filters.flatMap(getFields)
+val requestedFields = (projectionFields ++ filterFields).distinct
+
+// If [[requestedFields]] includes a proper field, continue. 
Otherwise,
+// return [[op]]
+if (requestedFields.exists { case (_, optAtt) => optAtt.isEmpty }) 
{
+  val prunedSchema = requestedFields
+.map { case (field, _) => StructType(Array(field)) }
+.reduceLeft(_ merge _)
--- End diff --

I've updated the logic for comparing the original schema to the pruned 
schema:


https://github.com/apache/spark/blob/52fddc181f32726cea1dd12a23ebf7201986be01/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala#L53-L57

The following test validates that the selection order is irrelevant in 
comparing the pruned schema to the original schema:


https://github.com/apache/spark/blob/52fddc181f32726cea1dd12a23ebf7201986be01/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala#L82-L107


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-10-02 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r142119164
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala
 ---
@@ -77,20 +77,21 @@ trait QueryPlanConstraints { self: LogicalPlan =>
 constraint match {
   // When the root is IsNotNull, we can push IsNotNull through the 
child null intolerant
   // expressions
-  case IsNotNull(expr) => 
scanNullIntolerantAttribute(expr).map(IsNotNull(_))
+  case IsNotNull(expr) => 
scanNullIntolerantField(expr).map(IsNotNull(_))
   // Constraints always return true for all the inputs. That means, 
null will never be returned.
   // Thus, we can infer `IsNotNull(constraint)`, and also push 
IsNotNull through the child
   // null intolerant expressions.
-  case _ => scanNullIntolerantAttribute(constraint).map(IsNotNull(_))
+  case _ => scanNullIntolerantField(constraint).map(IsNotNull(_))
--- End diff --

> I don't believe this is covered in the current unit tests, so I will add 
or modify a test to cover it.

I added the following test to cover this case:


https://github.com/apache/spark/blob/88786d3eaf9d3a3d2c80c2235c5014074ade3dc1/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala#L62-L79


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-10-02 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r142117188
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
 ---
@@ -63,9 +74,22 @@ private[parquet] class ParquetReadSupport extends 
ReadSupport[UnsafeRow] with Lo
   StructType.fromString(schemaString)
 }
 
-val parquetRequestedSchema =
+val clippedParquetSchema =
   ParquetReadSupport.clipParquetSchema(context.getFileSchema, 
catalystRequestedSchema)
 
+val parquetRequestedSchema = if (parquetMrCompatibility) {
+  // Parquet-mr will throw an exception if we try to read a superset 
of the file's schema.
+  // Therefore, we intersect our clipped schema with the underlying 
file's schema
--- End diff --

As I wrote below, this problem does exist with parquet-mr 1.8.2. Hence, 
I've reverted back to the version with the `parquetMrCompatibility`-related 
logic.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-09-22 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r140611282
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
 ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, 
ProjectionOverSchema, SelectedField}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.types.{StructField, StructType}
+
+/**
+ * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a
+ * [[ParquetRelation]]. By "Parquet column", we mean a column as defined 
in the
+ * Parquet format. In Spark SQL, a root-level Parquet column corresponds 
to a
+ * SQL column, and a nested Parquet column corresponds to a 
[[StructField]].
+ */
+private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+plan transformDown {
+  case op @ PhysicalOperation(projects, filters,
+  l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, 
partitionSchema,
+dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _)) =>
+val projectionFields = projects.flatMap(getFields)
+val filterFields = filters.flatMap(getFields)
+val requestedFields = (projectionFields ++ filterFields).distinct
+
+// If [[requestedFields]] includes a proper field, continue. 
Otherwise,
--- End diff --

I suggest "sub-field".


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-09-21 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r140368054
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala
 ---
@@ -77,20 +77,21 @@ trait QueryPlanConstraints { self: LogicalPlan =>
 constraint match {
   // When the root is IsNotNull, we can push IsNotNull through the 
child null intolerant
   // expressions
-  case IsNotNull(expr) => 
scanNullIntolerantAttribute(expr).map(IsNotNull(_))
+  case IsNotNull(expr) => 
scanNullIntolerantField(expr).map(IsNotNull(_))
   // Constraints always return true for all the inputs. That means, 
null will never be returned.
   // Thus, we can infer `IsNotNull(constraint)`, and also push 
IsNotNull through the child
   // null intolerant expressions.
-  case _ => scanNullIntolerantAttribute(constraint).map(IsNotNull(_))
+  case _ => scanNullIntolerantField(constraint).map(IsNotNull(_))
--- End diff --

> Incidentally, I have noticed that I need to update the code comments 
where I made the changes. I will push a commit with revised comments.

I've pushed the commit with the revised documentation: 
https://github.com/apache/spark/pull/16578/commits/38cec5f05066f75d868b64d360b04edb18dcebeb.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-09-21 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r140366890
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/AggregateFieldExtractionPushdown.scala
 ---
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, 
LogicalPlan, Project}
+
+/**
+ * Pushes down aliases to [[expressions.GetStructField]] expressions in an 
aggregate's grouping and
+ * aggregate expressions into a projection over its children. The original
+ * [[expressions.GetStructField]] expressions are replaced with references 
to the pushed down
+ * aliases.
+ */
+object AggregateFieldExtractionPushdown extends FieldExtractionPushdown {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+plan transformDown {
+  case agg @ Aggregate(groupingExpressions, aggregateExpressions, 
child) =>
+val expressions = groupingExpressions ++ aggregateExpressions
+val attributes = AttributeSet(expressions.collect { case att: 
Attribute => att })
+val childAttributes = AttributeSet(child.expressions)
+val fieldExtractors0 =
+  expressions
+.flatMap(getFieldExtractors)
+.distinct
+val fieldExtractors1 =
+  fieldExtractors0
+.filter(_.collectFirst { case att: Attribute => att }
+  .filter(attributes.contains).isEmpty)
--- End diff --

> Why do we need to trim the extractors which contain attributes referred 
from `groupingExpressions ++ aggregateExpressions`?

Consider this query:

```sql
select a, a.b, count(1) from r1 group by a, a.b
```

The grouping fields are `a` and `a.b`. `a` is an `Attribute`. `a.b` is a 
`GetStructField`. Since we need all of `a` to answer this query, it doesn't 
make sense to attempt to push down `a.b`. At the same time, `fieldExtractors0` 
includes all `GetStructField` instances. This includes `a.b`. The code you 
refer to above filters out the `a.b` `GetStructField` because our query 
requires all of `a`.

If we do not filter out `a.b`, then the child (projection) of the new 
`Aggregate` will not contain `a` in its `output`. The query planner will barf.

The logic for the creation of the new child projection is here: 
https://github.com/apache/spark/blob/00ab80c9b78c45c1a8f8c202c5bab04a62cda2ef/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/AggregateFieldExtractionPushdown.scala#L63-L70

This case is tested by 
https://github.com/apache/spark/blob/00ab80c9b78c45c1a8f8c202c5bab04a62cda2ef/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateFieldExtractionPushdownSuite.scala#L60-L76


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-09-21 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r140358379
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
 ---
@@ -63,9 +74,22 @@ private[parquet] class ParquetReadSupport extends 
ReadSupport[UnsafeRow] with Lo
   StructType.fromString(schemaString)
 }
 
-val parquetRequestedSchema =
+val clippedParquetSchema =
   ParquetReadSupport.clipParquetSchema(context.getFileSchema, 
catalystRequestedSchema)
 
+val parquetRequestedSchema = if (parquetMrCompatibility) {
+  // Parquet-mr will throw an exception if we try to read a superset 
of the file's schema.
+  // Therefore, we intersect our clipped schema with the underlying 
file's schema
--- End diff --

I had to go back in time to figure this one out. I wrote this code for 
parquet-mr 1.7.0, and indeed this code is necessary for that version of the 
library as validated by a test in `ParquetSchemaPruningSuite`. However, 
parquet-mr version 1.8.2 no longer has this limitation/bug. Therefore, I will 
remove the `parquetMrCompatibility`-related logic from this file.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-09-21 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r140357267
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/JoinFieldExtractionPushdown.scala
 ---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
NamedExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, 
Project}
+
+/**
+ * Pushes down aliases to [[expressions.GetStructField]] expressions in a 
projection over a join
+ * and its join condition. The original [[expressions.GetStructField]] 
expressions are replaced
+ * with references to the pushed down aliases.
+ */
+object JoinFieldExtractionPushdown extends FieldExtractionPushdown {
--- End diff --

I'm not sure I know what you mean. This test 
https://github.com/apache/spark/blob/38cec5f05066f75d868b64d360b04edb18dcebeb/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinFieldExtractionPushdownSuite.scala#L53-L61
 checks that `JoinFieldExtractionPushdown` does not modify a join not involving 
nested fields.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-09-21 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r140351256
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
 ---
@@ -63,9 +74,22 @@ private[parquet] class ParquetReadSupport extends 
ReadSupport[UnsafeRow] with Lo
   StructType.fromString(schemaString)
 }
 
-val parquetRequestedSchema =
+val clippedParquetSchema =
   ParquetReadSupport.clipParquetSchema(context.getFileSchema, 
catalystRequestedSchema)
 
+val parquetRequestedSchema = if (parquetMrCompatibility) {
+  // Parquet-mr will throw an exception if we try to read a superset 
of the file's schema.
+  // Therefore, we intersect our clipped schema with the underlying 
file's schema
+  ParquetReadSupport.intersectParquetGroups(clippedParquetSchema, 
context.getFileSchema)
+.map(intersectionGroup =>
+  new MessageType(intersectionGroup.getName, 
intersectionGroup.getFields))
+.getOrElse(ParquetSchemaConverter.EMPTY_MESSAGE)
+} else {
+  // Spark's built-in Parquet reader will throw an exception in some 
cases if the requested
+  // schema is not the same as the clipped schema
--- End diff --

We do use it in the vectorized reader, but it's not easy to see. It's 
instantiated here: 
https://github.com/apache/spark/blob/38cec5f05066f75d868b64d360b04edb18dcebeb/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java#L141
 via reflection.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-09-21 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r140338751
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala
 ---
@@ -77,20 +77,21 @@ trait QueryPlanConstraints { self: LogicalPlan =>
 constraint match {
   // When the root is IsNotNull, we can push IsNotNull through the 
child null intolerant
   // expressions
-  case IsNotNull(expr) => 
scanNullIntolerantAttribute(expr).map(IsNotNull(_))
+  case IsNotNull(expr) => 
scanNullIntolerantField(expr).map(IsNotNull(_))
   // Constraints always return true for all the inputs. That means, 
null will never be returned.
   // Thus, we can infer `IsNotNull(constraint)`, and also push 
IsNotNull through the child
   // null intolerant expressions.
-  case _ => scanNullIntolerantAttribute(constraint).map(IsNotNull(_))
+  case _ => scanNullIntolerantField(constraint).map(IsNotNull(_))
--- End diff --

Incidentally, I have noticed that I need to update the code comments where 
I made the changes. I will push a commit with revised comments.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-09-21 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r140338601
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala
 ---
@@ -77,20 +77,21 @@ trait QueryPlanConstraints { self: LogicalPlan =>
 constraint match {
   // When the root is IsNotNull, we can push IsNotNull through the 
child null intolerant
   // expressions
-  case IsNotNull(expr) => 
scanNullIntolerantAttribute(expr).map(IsNotNull(_))
+  case IsNotNull(expr) => 
scanNullIntolerantField(expr).map(IsNotNull(_))
   // Constraints always return true for all the inputs. That means, 
null will never be returned.
   // Thus, we can infer `IsNotNull(constraint)`, and also push 
IsNotNull through the child
   // null intolerant expressions.
-  case _ => scanNullIntolerantAttribute(constraint).map(IsNotNull(_))
+  case _ => scanNullIntolerantField(constraint).map(IsNotNull(_))
--- End diff --

I think the best way to explain is through an example. Suppose we have a 
Parquet file with two string columns: `name.first` and `name.last`. We'll 
register this file as a temp view with name `people`. Now consider the 
following query:

```sql
select name.first from people where name.first = 'Michael'
```

The only column we need to read from the file to answer this query is 
`name.first`.

Without the change to `QueryPlanConstraints.scala`, the optimized plan will 
look like:

```
Project [name#19.first AS first#30]
+- Filter (isnotnull(name#19) && (name#19.first = Michael))
   +- Relation[name#19] parquet
```

Evaluating `isnotnull(name#19)` requires the entire name struct. So both 
`name.first` and `name.last` will be read.

With the change, the optimized pln will look like:

```
Project [name#19.first AS first#30]
+- Filter (isnotnull(name#19.first) && (name#19.first = Michael))
   +- Relation[name#19] parquet
```

Now the `isnotnull` expression requires only the `name.first` column. The 
`name.last` column will not be read.

I don't believe this is covered in the current unit tests, so I will add or 
modify a test to cover it.

Note that in either case the `LogicalRelation` is shown as 
`Relation[name#19] parquet`, which is confusing. The string output for a 
logical relation returns attributes, not fields. Perhaps we should do something 
about that? So in the latter case it would print `Relation[name#19.first] 
parquet` instead.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-09-21 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r140226905
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
 ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, 
ProjectionOverSchema, SelectedField}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.types.{StructField, StructType}
+
+/**
+ * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a
+ * [[ParquetRelation]]. By "Parquet column", we mean a column as defined 
in the
+ * Parquet format. In Spark SQL, a root-level Parquet column corresponds 
to a
+ * SQL column, and a nested Parquet column corresponds to a 
[[StructField]].
+ */
+private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+plan transformDown {
+  case op @ PhysicalOperation(projects, filters,
+  l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, 
partitionSchema,
+dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _)) =>
+val projectionFields = projects.flatMap(getFields)
+val filterFields = filters.flatMap(getFields)
+val requestedFields = (projectionFields ++ filterFields).distinct
+
+// If [[requestedFields]] includes a proper field, continue. 
Otherwise,
+// return [[op]]
+if (requestedFields.exists { case (_, optAtt) => optAtt.isEmpty }) 
{
+  val prunedSchema = requestedFields
+.map { case (field, _) => StructType(Array(field)) }
+.reduceLeft(_ merge _)
--- End diff --

> Can the merging order of the fields affect the comparison later in line 
57?

Not the merging order, no. However the selection order can, leading to 
unnecessary "pruning". I will address this issue and add a test case to verify 
it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-09-18 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r139346525
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
 ---
@@ -63,9 +74,22 @@ private[parquet] class ParquetReadSupport extends 
ReadSupport[UnsafeRow] with Lo
   StructType.fromString(schemaString)
 }
 
-val parquetRequestedSchema =
+val clippedParquetSchema =
   ParquetReadSupport.clipParquetSchema(context.getFileSchema, 
catalystRequestedSchema)
 
+val parquetRequestedSchema = if (parquetMrCompatibility) {
+  // Parquet-mr will throw an exception if we try to read a superset 
of the file's schema.
+  // Therefore, we intersect our clipped schema with the underlying 
file's schema
--- End diff --

Is the exception only thrown when we read a superset of fields for a nested 
column? E.g., there is a struct of two fields "a" and "b", and we try to read 
"c".


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-09-17 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r139337054
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/AggregateFieldExtractionPushdown.scala
 ---
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, 
LogicalPlan, Project}
+
+/**
+ * Pushes down aliases to [[expressions.GetStructField]] expressions in an 
aggregate's grouping and
+ * aggregate expressions into a projection over its children. The original
+ * [[expressions.GetStructField]] expressions are replaced with references 
to the pushed down
+ * aliases.
+ */
+object AggregateFieldExtractionPushdown extends FieldExtractionPushdown {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+plan transformDown {
+  case agg @ Aggregate(groupingExpressions, aggregateExpressions, 
child) =>
+val expressions = groupingExpressions ++ aggregateExpressions
+val attributes = AttributeSet(expressions.collect { case att: 
Attribute => att })
+val childAttributes = AttributeSet(child.expressions)
+val fieldExtractors0 =
+  expressions
+.flatMap(getFieldExtractors)
+.distinct
+val fieldExtractors1 =
+  fieldExtractors0
+.filter(_.collectFirst { case att: Attribute => att }
+  .filter(attributes.contains).isEmpty)
--- End diff --

Why do we need to trim the extractors which contain attributes referred 
from `groupingExpressions ++ aggregateExpressions`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-09-17 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r139336399
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/JoinFieldExtractionPushdown.scala
 ---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
NamedExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, 
Project}
+
+/**
+ * Pushes down aliases to [[expressions.GetStructField]] expressions in a 
projection over a join
+ * and its join condition. The original [[expressions.GetStructField]] 
expressions are replaced
+ * with references to the pushed down aliases.
+ */
+object JoinFieldExtractionPushdown extends FieldExtractionPushdown {
--- End diff --

This applies generally. But for the non-nested column pruning cases, this 
seems a burden and making the query plan complicated.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-09-17 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r139333125
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
 ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, 
ProjectionOverSchema, SelectedField}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.types.{StructField, StructType}
+
+/**
+ * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a
+ * [[ParquetRelation]]. By "Parquet column", we mean a column as defined 
in the
+ * Parquet format. In Spark SQL, a root-level Parquet column corresponds 
to a
+ * SQL column, and a nested Parquet column corresponds to a 
[[StructField]].
+ */
+private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+plan transformDown {
+  case op @ PhysicalOperation(projects, filters,
+  l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, 
partitionSchema,
+dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _)) =>
+val projectionFields = projects.flatMap(getFields)
+val filterFields = filters.flatMap(getFields)
+val requestedFields = (projectionFields ++ filterFields).distinct
+
+// If [[requestedFields]] includes a proper field, continue. 
Otherwise,
+// return [[op]]
+if (requestedFields.exists { case (_, optAtt) => optAtt.isEmpty }) 
{
+  val prunedSchema = requestedFields
+.map { case (field, _) => StructType(Array(field)) }
+.reduceLeft(_ merge _)
--- End diff --

We should add related tests for such cases.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-09-17 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r139331293
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
 ---
@@ -63,9 +74,22 @@ private[parquet] class ParquetReadSupport extends 
ReadSupport[UnsafeRow] with Lo
   StructType.fromString(schemaString)
 }
 
-val parquetRequestedSchema =
+val clippedParquetSchema =
   ParquetReadSupport.clipParquetSchema(context.getFileSchema, 
catalystRequestedSchema)
 
+val parquetRequestedSchema = if (parquetMrCompatibility) {
+  // Parquet-mr will throw an exception if we try to read a superset 
of the file's schema.
+  // Therefore, we intersect our clipped schema with the underlying 
file's schema
--- End diff --

`clipParquetSchema` can add column paths only exist in catalyst schema into 
parquet schema. I think those columns are useful but looks like the 
`intersectParquetGroups` will remove them again?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-09-17 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r139331198
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
 ---
@@ -63,9 +74,22 @@ private[parquet] class ParquetReadSupport extends 
ReadSupport[UnsafeRow] with Lo
   StructType.fromString(schemaString)
 }
 
-val parquetRequestedSchema =
+val clippedParquetSchema =
   ParquetReadSupport.clipParquetSchema(context.getFileSchema, 
catalystRequestedSchema)
 
+val parquetRequestedSchema = if (parquetMrCompatibility) {
+  // Parquet-mr will throw an exception if we try to read a superset 
of the file's schema.
+  // Therefore, we intersect our clipped schema with the underlying 
file's schema
+  ParquetReadSupport.intersectParquetGroups(clippedParquetSchema, 
context.getFileSchema)
+.map(intersectionGroup =>
+  new MessageType(intersectionGroup.getName, 
intersectionGroup.getFields))
+.getOrElse(ParquetSchemaConverter.EMPTY_MESSAGE)
+} else {
+  // Spark's built-in Parquet reader will throw an exception in some 
cases if the requested
+  // schema is not the same as the clipped schema
--- End diff --

I think the built-in Parquet reader means vectorized reader. But I think we 
don't use `ParquetReadSupport` in vectorized reader?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-09-17 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r139312657
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala
 ---
@@ -77,20 +77,21 @@ trait QueryPlanConstraints { self: LogicalPlan =>
 constraint match {
   // When the root is IsNotNull, we can push IsNotNull through the 
child null intolerant
   // expressions
-  case IsNotNull(expr) => 
scanNullIntolerantAttribute(expr).map(IsNotNull(_))
+  case IsNotNull(expr) => 
scanNullIntolerantField(expr).map(IsNotNull(_))
   // Constraints always return true for all the inputs. That means, 
null will never be returned.
   // Thus, we can infer `IsNotNull(constraint)`, and also push 
IsNotNull through the child
   // null intolerant expressions.
-  case _ => scanNullIntolerantAttribute(constraint).map(IsNotNull(_))
+  case _ => scanNullIntolerantField(constraint).map(IsNotNull(_))
--- End diff --

Previously IsNotNull constraints are Attribute-specific, why do we need to 
expand it to `ExtractValue`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-09-17 Thread vkhristenko
Github user vkhristenko commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r139312613
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
 ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, 
ProjectionOverSchema, SelectedField}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.types.{StructField, StructType}
+
+/**
+ * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a
+ * [[ParquetRelation]]. By "Parquet column", we mean a column as defined 
in the
+ * Parquet format. In Spark SQL, a root-level Parquet column corresponds 
to a
+ * SQL column, and a nested Parquet column corresponds to a 
[[StructField]].
+ */
+private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+plan transformDown {
+  case op @ PhysicalOperation(projects, filters,
+  l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, 
partitionSchema,
+dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _)) =>
+val projectionFields = projects.flatMap(getFields)
+val filterFields = filters.flatMap(getFields)
+val requestedFields = (projectionFields ++ filterFields).distinct
+
+// If [[requestedFields]] includes a proper field, continue. 
Otherwise,
+// return [[op]]
+if (requestedFields.exists { case (_, optAtt) => optAtt.isEmpty }) 
{
+  val prunedSchema = requestedFields
+.map { case (field, _) => StructType(Array(field)) }
+.reduceLeft(_ merge _)
--- End diff --

@viirya @mallman 

If I may add here, given a schema:
```
root
| - a: StructType
|| - f1: Int
|| - f2: Int
```
and a selection `df.select("a.f1", "a.f2")` vs `df.select("a.f2", "a.f1")`
will produce different requiredSchema fed into buildReader upon some action.

In the first case it will be `StructType( StructField( "a", 
StructType(StructField("f1") :: StructField("f2") :: Nil)) :: Nil)` and in the 
second `StructType( StructField( "a", StructType(StructField("f2") :: 
StructField("f1") :: Nil)) :: Nil)` 

which means that the original schema is different from the one that is 
required for the second case. But as long as fields f1 and f2 are splitted (can 
be read without reading the other), it's remappable on the data source level.

VK


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-09-17 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r139312181
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
 ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, 
ProjectionOverSchema, SelectedField}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.types.{StructField, StructType}
+
+/**
+ * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a
+ * [[ParquetRelation]]. By "Parquet column", we mean a column as defined 
in the
+ * Parquet format. In Spark SQL, a root-level Parquet column corresponds 
to a
+ * SQL column, and a nested Parquet column corresponds to a 
[[StructField]].
+ */
+private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+plan transformDown {
+  case op @ PhysicalOperation(projects, filters,
+  l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, 
partitionSchema,
+dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _)) =>
+val projectionFields = projects.flatMap(getFields)
+val filterFields = filters.flatMap(getFields)
+val requestedFields = (projectionFields ++ filterFields).distinct
+
+// If [[requestedFields]] includes a proper field, continue. 
Otherwise,
+// return [[op]]
+if (requestedFields.exists { case (_, optAtt) => optAtt.isEmpty }) 
{
+  val prunedSchema = requestedFields
+.map { case (field, _) => StructType(Array(field)) }
+.reduceLeft(_ merge _)
--- End diff --

Can the merging order of the fields affect the comparison later in line 57?

For example, depending on the merging order, if we require `a.field1` and 
`a.field2`, is it possibly we may have `StructType(StructField("a", 
StructType(StructField("field1") :: StructField("field2") :: Nil)) :: Nil)` or  
`StructType(StructField("a", StructType(StructField("field2") :: 
StructField("field1") :: Nil)) :: Nil)` as merged schema?

Once we have the later, and the data schema is the first, at line 57 we may 
think they are different?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-09-17 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r139311738
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
 ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, 
ProjectionOverSchema, SelectedField}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.types.{StructField, StructType}
+
+/**
+ * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a
+ * [[ParquetRelation]]. By "Parquet column", we mean a column as defined 
in the
+ * Parquet format. In Spark SQL, a root-level Parquet column corresponds 
to a
+ * SQL column, and a nested Parquet column corresponds to a 
[[StructField]].
+ */
+private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+plan transformDown {
+  case op @ PhysicalOperation(projects, filters,
+  l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, 
partitionSchema,
+dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _)) =>
+val projectionFields = projects.flatMap(getFields)
+val filterFields = filters.flatMap(getFields)
+val requestedFields = (projectionFields ++ filterFields).distinct
+
+// If [[requestedFields]] includes a proper field, continue. 
Otherwise,
--- End diff --

`proper field` sounds vague. Maybe more specified like `nested columns`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-09-17 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r139311726
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
 ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, 
ProjectionOverSchema, SelectedField}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.types.{StructField, StructType}
+
+/**
+ * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a
+ * [[ParquetRelation]]. By "Parquet column", we mean a column as defined 
in the
+ * Parquet format. In Spark SQL, a root-level Parquet column corresponds 
to a
+ * SQL column, and a nested Parquet column corresponds to a 
[[StructField]].
+ */
+private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+plan transformDown {
+  case op @ PhysicalOperation(projects, filters,
+  l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, 
partitionSchema,
+dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _)) =>
+val projectionFields = projects.flatMap(getFields)
+val filterFields = filters.flatMap(getFields)
+val requestedFields = (projectionFields ++ filterFields).distinct
+
+// If [[requestedFields]] includes a proper field, continue. 
Otherwise,
+// return [[op]]
+if (requestedFields.exists { case (_, optAtt) => optAtt.isEmpty }) 
{
+  val prunedSchema = requestedFields
+.map { case (field, _) => StructType(Array(field)) }
+.reduceLeft(_ merge _)
+  val dataSchemaFieldNames = dataSchema.fieldNames
+  val prunedDataSchema =
+StructType(prunedSchema.filter(f => 
dataSchemaFieldNames.contains(f.name)))
+  val parquetDataFields = dataSchema.fields.toSet
+  val prunedDataFields = prunedDataSchema.fields.toSet
+
+  // If the original Parquet relation data fields are different 
from the
+  // pruned data fields, continue. Otherwise, return [[op]]
+  if (parquetDataFields != prunedDataFields) {
+val prunedParquetRelation =
+  hadoopFsRelation.copy(dataSchema = 
prunedDataSchema)(hadoopFsRelation.sparkSession)
+
+// We need to replace the expression ids of the pruned 
relation output attributes
+// with the expression ids of the original relation output 
attributes so that
+// references to the original relation's output are not broken
+val outputIdMap = l.output.map(att => (att.name, 
att.exprId)).toMap
+val prunedRelationOutput =
+  prunedParquetRelation
+.schema
+.toAttributes
+.map {
+  case att if outputIdMap.contains(att.name) =>
+att.withExprId(outputIdMap(att.name))
+  case att => att
+}
+val prunedRelation =
+  LogicalRelation(prunedParquetRelation, prunedRelationOutput, 
None)
+
+val projectionOverSchema = 
ProjectionOverSchema(prunedDataSchema)
+
+// Construct a new target for our projection by rewriting and
+// including the original filters where available
+val projectionChild =
+  if (filters.nonEmpty) {
+val projectedFilters = 

[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-09-17 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r139311391
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/SelectedField.scala
 ---
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.planning
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types._
+
+/**
+ * A Scala extractor that builds a 
[[org.apache.spark.sql.types.StructField]] from a Catalyst
+ * complex type extractor. For example, consider a relation with the 
following schema:
+ *
+ *   {{{
+ *   root
+ *|-- name: struct (nullable = true)
+ *||-- first: string (nullable = true)
+ *||-- last: string (nullable = true)
+ *}}}
+ *
+ * Further, suppose we take the select expression `name.first`. This will 
parse into an
+ * `Alias(child, "first")`. Ignoring the alias, `child` matches the 
following pattern:
+ *
+ *   {{{
+ *   GetStructFieldObject(
+ * AttributeReference("name", StructType(_), _, _),
+ * StructField("first", StringType, _, _))
+ *   }}}
+ *
+ * [[SelectedField]] converts that expression into
+ *
+ *   {{{
+ *   StructField("name", StructType(Array(StructField("first", 
StringType
+ *   }}}
+ *
+ * by mapping each complex type extractor to a 
[[org.apache.spark.sql.types.StructField]] with the
+ * same name as its child (or "parent" going right to left in the select 
expression) and a data
+ * type appropriate to the complex type extractor. In our example, the 
name of the child expression
+ * is "name" and its data type is a 
[[org.apache.spark.sql.types.StructType]] with a single string
+ * field named "first".
+ *
+ * @param expr the top-level complex type extractor
+ */
+object SelectedField {
+  def unapply(expr: Expression): Option[StructField] = {
+// If this expression is an alias, work on its child instead
+val unaliased = expr match {
+  case Alias(child, _) => child
+  case expr => expr
+}
+selectField(unaliased, None)
+  }
+
+  private def selectField(expr: Expression, fieldOpt: 
Option[StructField]): Option[StructField] = {
+expr match {
+  // No children. Returns a StructField with the attribute name or 
None if fieldOpt is None.
+  case AttributeReference(name, dataType, nullable, metadata) =>
+fieldOpt.map(field =>
+  StructField(name, wrapStructType(dataType, field), nullable, 
metadata))
+  // Handles case "expr0.field[n]", where "expr0" is of struct type 
and "expr0.field" is of
+  // array type.
+  case GetArrayItem(x @ GetStructFieldObject(child, field @ 
StructField(name,
+  dataType, nullable, metadata)), _) =>
+val childField = fieldOpt.map(field => StructField(name,
+  wrapStructType(dataType, field), nullable, 
metadata)).getOrElse(field)
+selectField(child, Some(childField))
+  // Handles case "expr0.field[n]", where "expr0.field" is of array 
type.
+  case GetArrayItem(child, _) =>
+selectField(child, fieldOpt)
+  // Handles case "expr0.field.subfield", where "expr0" and 
"expr0.field" are of array type.
+  case GetArrayStructFields(child: GetArrayStructFields,
+  field @ StructField(name, dataType, nullable, metadata), _, _, 
_) =>
+val childField = fieldOpt.map(field => StructField(name,
+wrapStructType(dataType, field),
+nullable, metadata)).getOrElse(field)
+selectField(child, Some(childField))
+  // Handles case "expr0.field", where "expr0" is of array type.
+  case GetArrayStructFields(child,
+  field @ StructField(name, dataType, nullable, metadata), _, _, 
containsNull) =>
+val childField =
+  fieldOpt.map(field => StructField(name,
+  

[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-02-09 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r100360523
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/GetStructField2.scala
 ---
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.planning
+
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
GetStructField}
+import org.apache.spark.sql.types.StructField
+
+/**
+ * A Scala extractor that extracts the child expression and struct field 
from a [[GetStructField]].
+ * This is in contrast to the [[GetStructField]] case class extractor 
which returns the field
+ * ordinal instead of the field itself.
+ */
+private[planning] object GetStructField2 {
--- End diff --

Let's go with `GetStructFieldObject`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-02-08 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r100232773
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/GetStructField2.scala
 ---
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.planning
+
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
GetStructField}
+import org.apache.spark.sql.types.StructField
+
+/**
+ * A Scala extractor that extracts the child expression and struct field 
from a [[GetStructField]].
+ * This is in contrast to the [[GetStructField]] case class extractor 
which returns the field
+ * ordinal instead of the field itself.
+ */
+private[planning] object GetStructField2 {
--- End diff --

`GetStructFieldObject` or `GetStructFieldExtractor`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-02-08 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r100229358
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/GetStructField2.scala
 ---
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.planning
+
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
GetStructField}
+import org.apache.spark.sql.types.StructField
+
+/**
+ * A Scala extractor that extracts the child expression and struct field 
from a [[GetStructField]].
+ * This is in contrast to the [[GetStructField]] case class extractor 
which returns the field
+ * ordinal instead of the field itself.
+ */
+private[planning] object GetStructField2 {
--- End diff --

How about `GetStructFieldObject`? Or `GetStructFieldRef`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-02-08 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r100229300
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/SelectedField.scala
 ---
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.planning
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types._
+
+/**
+ * A Scala extractor that builds a [[StructField]] from a Catalyst complex 
type
+ * extractor. This is like the opposite of [[ExtractValue#apply]].
+ */
+object SelectedField {
+  def unapply(expr: Expression): Option[StructField] = {
+// If this expression is an alias, work on its child instead
+val unaliased = expr match {
+  case Alias(child, _) => child
+  case expr => expr
+}
+selectField(unaliased, None)
+  }
+
+  /**
+   * Converts some chain of complex type extractors into a [[StructField]].
+   *
+   * @param expr the top-level complex type extractor
+   * @param fieldOpt the subfield of [[expr]], where relevent
+   */
+  private def selectField(expr: Expression, fieldOpt: 
Option[StructField]): Option[StructField] =
+expr match {
+  case AttributeReference(name, _, nullable, _) =>
+fieldOpt.map(field => StructField(name, StructType(Array(field)), 
nullable))
+  case GetArrayItem(GetStructField2(child, field @ StructField(name,
+  ArrayType(_, arrayNullable), fieldNullable, _)), _) =>
+val childField = fieldOpt.map(field => StructField(name, ArrayType(
+  StructType(Array(field)), arrayNullable), 
fieldNullable)).getOrElse(field)
+selectField(child, Some(childField))
+  case GetArrayStructFields(child,
--- End diff --

I've spent some time this week developing a few different solutions to this 
problem, however none of them are very easy to understand or verify. I'm going 
to spend some more time working on a simpler solution before posting something 
back.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-02-02 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r99174674
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/SelectedField.scala
 ---
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.planning
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types._
+
+/**
+ * A Scala extractor that builds a [[StructField]] from a Catalyst complex 
type
+ * extractor. This is like the opposite of [[ExtractValue#apply]].
+ */
+object SelectedField {
+  def unapply(expr: Expression): Option[StructField] = {
+// If this expression is an alias, work on its child instead
+val unaliased = expr match {
+  case Alias(child, _) => child
+  case expr => expr
+}
+selectField(unaliased, None)
+  }
+
+  /**
+   * Converts some chain of complex type extractors into a [[StructField]].
+   *
+   * @param expr the top-level complex type extractor
+   * @param fieldOpt the subfield of [[expr]], where relevent
+   */
+  private def selectField(expr: Expression, fieldOpt: 
Option[StructField]): Option[StructField] =
+expr match {
+  case AttributeReference(name, _, nullable, _) =>
+fieldOpt.map(field => StructField(name, StructType(Array(field)), 
nullable))
+  case GetArrayItem(GetStructField2(child, field @ StructField(name,
+  ArrayType(_, arrayNullable), fieldNullable, _)), _) =>
+val childField = fieldOpt.map(field => StructField(name, ArrayType(
+  StructType(Array(field)), arrayNullable), 
fieldNullable)).getOrElse(field)
+selectField(child, Some(childField))
+  case GetArrayStructFields(child,
--- End diff --

I believe I have a fix for this, but I probably won't be able to post a new 
commit until early next week—I'm working on a proposal for the Spark Summit 
RFP.

Cheers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-02-01 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r98920657
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/GetStructField2.scala
 ---
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.planning
+
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
GetStructField}
+import org.apache.spark.sql.types.StructField
+
+/**
+ * A Scala extractor that extracts the child expression and struct field 
from a [[GetStructField]].
+ * This is in contrast to the [[GetStructField]] case class extractor 
which returns the field
+ * ordinal instead of the field itself.
+ */
+private[planning] object GetStructField2 {
--- End diff --

Agreed. I think the best name in this context is `GetStructField`, but 
that's already taken. I'll keep thinking about a good alternative.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-02-01 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r98859135
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/SelectedField.scala
 ---
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.planning
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types._
+
+/**
+ * A Scala extractor that builds a [[StructField]] from a Catalyst complex 
type
+ * extractor. This is like the opposite of [[ExtractValue#apply]].
+ */
+object SelectedField {
+  def unapply(expr: Expression): Option[StructField] = {
+// If this expression is an alias, work on its child instead
+val unaliased = expr match {
+  case Alias(child, _) => child
+  case expr => expr
+}
+selectField(unaliased, None)
+  }
+
+  /**
+   * Converts some chain of complex type extractors into a [[StructField]].
+   *
+   * @param expr the top-level complex type extractor
+   * @param fieldOpt the subfield of [[expr]], where relevent
+   */
+  private def selectField(expr: Expression, fieldOpt: 
Option[StructField]): Option[StructField] =
+expr match {
+  case AttributeReference(name, _, nullable, _) =>
+fieldOpt.map(field => StructField(name, StructType(Array(field)), 
nullable))
+  case GetArrayItem(GetStructField2(child, field @ StructField(name,
+  ArrayType(_, arrayNullable), fieldNullable, _)), _) =>
+val childField = fieldOpt.map(field => StructField(name, ArrayType(
+  StructType(Array(field)), arrayNullable), 
fieldNullable)).getOrElse(field)
+selectField(child, Some(childField))
+  case GetArrayStructFields(child,
--- End diff --

If we have a chain of `GetArrayStructFields`, looks like this will produce 
wrong result.

 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-02-01 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r98845999
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
 ---
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, 
ProjectionOverSchema, SelectedField}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.types.{StructField, StructType}
+
+/**
+ * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a
+ * [[ParquetRelation]]. By "Parquet column", we mean a column as defined 
in the
+ * Parquet format. In Spark SQL, a root-level Parquet column corresponds 
to a
+ * SQL column, and a nested Parquet column corresponds to a 
[[StructField]].
+ */
+private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+plan transformDown {
+  case op @ PhysicalOperation(projects, filters,
+  l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, 
partitionSchema,
+dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _)) =>
+val projectionFields = projects.flatMap(getFields)
+val filterFields = filters.flatMap(getFields)
+val requestedFields = (projectionFields ++ filterFields).distinct
+
+// If [[requestedFields]] includes a proper field, continue. 
Otherwise,
+// return [[op]]
+if (requestedFields.exists { case (_, optAtt) => optAtt.isEmpty }) 
{
+  val prunedSchema = requestedFields
+.map { case (field, _) => field }
+.map(field => StructType(Array(field)))
+.reduceLeft(_ merge _)
+  val parquetDataColumnNames = dataSchema.fieldNames
+  val prunedDataSchema =
+StructType(prunedSchema.filter(f => 
parquetDataColumnNames.contains(f.name)))
+  val parquetDataFields = dataSchema.fields.toSet
+  val prunedDataFields = prunedDataSchema.fields.toSet
+
+  // If the original Parquet relation data fields are different 
from the
+  // pruned data fields, continue. Otherwise, return [[op]]
+  if (parquetDataFields != prunedDataFields) {
+val dataSchemaFieldNames = 
hadoopFsRelation.dataSchema.fieldNames
+val newDataSchema =
+  StructType(prunedSchema.filter(f => 
dataSchemaFieldNames.contains(f.name)))
+val prunedParquetRelation =
+  hadoopFsRelation.copy(dataSchema = 
newDataSchema)(hadoopFsRelation.sparkSession)
+val outputMap = l.output.map(att => (att.name, att)).toMap
+
+// We need to map the output of the original logical relation
+// to the attributes of the pruned parquet schema where
+// possible so that references to those attributes elsewhere in
+// the query plan are not broken
+val expectedOutputAttributes =
+  prunedParquetRelation
+.schema
+.toAttributes.map(att => outputMap.getOrElse(att.name, 
att))
+val prunedRelation =
+  LogicalRelation(prunedParquetRelation, 
Some(expectedOutputAttributes))
+
+val projectionOverSchema = 
ProjectionOverSchema(prunedDataSchema)
+
+// Construct a new target for our projection by rewriting and
+// including the original filters where available
+val projectionChild =
+ 

[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-02-01 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r98845200
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
 ---
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, 
ProjectionOverSchema, SelectedField}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.types.{StructField, StructType}
+
+/**
+ * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a
+ * [[ParquetRelation]]. By "Parquet column", we mean a column as defined 
in the
+ * Parquet format. In Spark SQL, a root-level Parquet column corresponds 
to a
+ * SQL column, and a nested Parquet column corresponds to a 
[[StructField]].
+ */
+private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+plan transformDown {
+  case op @ PhysicalOperation(projects, filters,
+  l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, 
partitionSchema,
+dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _)) =>
+val projectionFields = projects.flatMap(getFields)
+val filterFields = filters.flatMap(getFields)
+val requestedFields = (projectionFields ++ filterFields).distinct
+
+// If [[requestedFields]] includes a proper field, continue. 
Otherwise,
+// return [[op]]
+if (requestedFields.exists { case (_, optAtt) => optAtt.isEmpty }) 
{
+  val prunedSchema = requestedFields
+.map { case (field, _) => field }
+.map(field => StructType(Array(field)))
+.reduceLeft(_ merge _)
+  val parquetDataColumnNames = dataSchema.fieldNames
+  val prunedDataSchema =
+StructType(prunedSchema.filter(f => 
parquetDataColumnNames.contains(f.name)))
+  val parquetDataFields = dataSchema.fields.toSet
+  val prunedDataFields = prunedDataSchema.fields.toSet
+
+  // If the original Parquet relation data fields are different 
from the
+  // pruned data fields, continue. Otherwise, return [[op]]
+  if (parquetDataFields != prunedDataFields) {
+val dataSchemaFieldNames = 
hadoopFsRelation.dataSchema.fieldNames
+val newDataSchema =
+  StructType(prunedSchema.filter(f => 
dataSchemaFieldNames.contains(f.name)))
--- End diff --

Is `newDataSchema` actually `prunedDataSchema`, if `dataSchemaFieldNames` 
is `parquetDataColumnNames`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-02-01 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r98844948
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
 ---
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, 
ProjectionOverSchema, SelectedField}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.types.{StructField, StructType}
+
+/**
+ * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a
+ * [[ParquetRelation]]. By "Parquet column", we mean a column as defined 
in the
+ * Parquet format. In Spark SQL, a root-level Parquet column corresponds 
to a
+ * SQL column, and a nested Parquet column corresponds to a 
[[StructField]].
+ */
+private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+plan transformDown {
+  case op @ PhysicalOperation(projects, filters,
+  l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, 
partitionSchema,
+dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _)) =>
+val projectionFields = projects.flatMap(getFields)
+val filterFields = filters.flatMap(getFields)
+val requestedFields = (projectionFields ++ filterFields).distinct
+
+// If [[requestedFields]] includes a proper field, continue. 
Otherwise,
+// return [[op]]
+if (requestedFields.exists { case (_, optAtt) => optAtt.isEmpty }) 
{
+  val prunedSchema = requestedFields
+.map { case (field, _) => field }
+.map(field => StructType(Array(field)))
+.reduceLeft(_ merge _)
+  val parquetDataColumnNames = dataSchema.fieldNames
+  val prunedDataSchema =
+StructType(prunedSchema.filter(f => 
parquetDataColumnNames.contains(f.name)))
+  val parquetDataFields = dataSchema.fields.toSet
+  val prunedDataFields = prunedDataSchema.fields.toSet
+
+  // If the original Parquet relation data fields are different 
from the
+  // pruned data fields, continue. Otherwise, return [[op]]
+  if (parquetDataFields != prunedDataFields) {
+val dataSchemaFieldNames = 
hadoopFsRelation.dataSchema.fieldNames
--- End diff --

I think `dataSchemaFieldNames` is just `parquetDataColumnNames` above?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-02-01 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r98844304
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
 ---
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, 
ProjectionOverSchema, SelectedField}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.types.{StructField, StructType}
+
+/**
+ * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a
+ * [[ParquetRelation]]. By "Parquet column", we mean a column as defined 
in the
+ * Parquet format. In Spark SQL, a root-level Parquet column corresponds 
to a
+ * SQL column, and a nested Parquet column corresponds to a 
[[StructField]].
+ */
+private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+plan transformDown {
+  case op @ PhysicalOperation(projects, filters,
+  l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, 
partitionSchema,
+dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _)) =>
+val projectionFields = projects.flatMap(getFields)
+val filterFields = filters.flatMap(getFields)
+val requestedFields = (projectionFields ++ filterFields).distinct
+
+// If [[requestedFields]] includes a proper field, continue. 
Otherwise,
+// return [[op]]
+if (requestedFields.exists { case (_, optAtt) => optAtt.isEmpty }) 
{
+  val prunedSchema = requestedFields
+.map { case (field, _) => field }
--- End diff --

Those two `map` can be done one `map`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-02-01 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r98843749
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/SelectedField.scala
 ---
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.planning
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types._
+
+/**
+ * A Scala extractor that builds a [[StructField]] from a Catalyst complex 
type
+ * extractor. This is like the opposite of [[ExtractValue#apply]].
+ */
+object SelectedField {
+  def unapply(expr: Expression): Option[StructField] = {
+// If this expression is an alias, work on its child instead
+val unaliased = expr match {
+  case Alias(child, _) => child
+  case expr => expr
+}
+selectField(unaliased, None)
+  }
+
+  /**
+   * Converts some chain of complex type extractors into a [[StructField]].
--- End diff --

It is better to add few example input and output in the comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-01-31 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r98833772
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/GetStructField2.scala
 ---
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.planning
+
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
GetStructField}
+import org.apache.spark.sql.types.StructField
+
+/**
+ * A Scala extractor that extracts the child expression and struct field 
from a [[GetStructField]].
+ * This is in contrast to the [[GetStructField]] case class extractor 
which returns the field
+ * ordinal instead of the field itself.
+ */
+private[planning] object GetStructField2 {
--- End diff --

But we can have a better name.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-01-31 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r98833717
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/GetStructField2.scala
 ---
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.planning
+
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
GetStructField}
+import org.apache.spark.sql.types.StructField
+
+/**
+ * A Scala extractor that extracts the child expression and struct field 
from a [[GetStructField]].
+ * This is in contrast to the [[GetStructField]] case class extractor 
which returns the field
+ * ordinal instead of the field itself.
+ */
+private[planning] object GetStructField2 {
--- End diff --

oh, nvm, I thought `GetStructField` is another Scala extractor. Actually it 
is a case class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-01-31 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r98819150
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/GetStructField2.scala
 ---
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.planning
+
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
GetStructField}
+import org.apache.spark.sql.types.StructField
+
+/**
+ * A Scala extractor that extracts the child expression and struct field 
from a [[GetStructField]].
+ * This is in contrast to the [[GetStructField]] case class extractor 
which returns the field
+ * ordinal instead of the field itself.
+ */
+private[planning] object GetStructField2 {
--- End diff --

What do you mean by combining it with the existing case class extractor?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-01-31 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r98809992
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/GetStructField2.scala
 ---
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.planning
+
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
GetStructField}
+import org.apache.spark.sql.types.StructField
+
+/**
+ * A Scala extractor that extracts the child expression and struct field 
from a [[GetStructField]].
+ * This is in contrast to the [[GetStructField]] case class extractor 
which returns the field
+ * ordinal instead of the field itself.
+ */
+private[planning] object GetStructField2 {
--- End diff --

Or combine it with `GetStructField` case class extractor?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-01-31 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r98809770
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/GetStructField2.scala
 ---
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.planning
+
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
GetStructField}
+import org.apache.spark.sql.types.StructField
+
+/**
+ * A Scala extractor that extracts the child expression and struct field 
from a [[GetStructField]].
+ * This is in contrast to the [[GetStructField]] case class extractor 
which returns the field
+ * ordinal instead of the field itself.
+ */
+private[planning] object GetStructField2 {
--- End diff --

Can we have a better name for this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-01-13 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16578#discussion_r96048060
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateFieldExtractionPushdownSuite.scala
 ---
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.aggregate.Count
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, 
LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.types._
+
+class AggregateFieldExtractionPushdownSuite extends PlanTest {
+  private val testRelation =
+LocalRelation(
+  StructField("a", StructType(
+StructField("a1", IntegerType) :: Nil)),
+  StructField("b", IntegerType),
+  StructField("c", StructType(
+StructField("c1", IntegerType) :: Nil)))
+
+  object Optimizer extends RuleExecutor[LogicalPlan] {
+val batches =
+  Batch("Aggregate Field Extraction Pushdown", Once,
+AggregateFieldExtractionPushdown) :: Nil
+  }
+
+  test("basic aggregate field extraction pushdown") {
+val originalQuery =
+  testRelation
+.select('a)
+.groupBy('a getField "a1")('a getField "a1" as 'a1, Count('*))
--- End diff --

replace it by `Count("*")`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-01-13 Thread mallman
GitHub user mallman opened a pull request:

https://github.com/apache/spark/pull/16578

[SPARK-4502][SQL] Parquet nested column pruning

(Link to Jira: https://issues.apache.org/jira/browse/SPARK-4502)

## What changes were proposed in this pull request?

One of the hallmarks of a column-oriented data storage format is the 
ability to read data from a subset of columns, efficiently skipping reads from 
other columns. Spark has long had support for pruning unneeded top-level schema 
fields from the scan of a parquet file. For example, consider a table, 
`contacts`, backed by parquet with the following Spark SQL schema:

```
root
 |-- name: struct
 ||-- first: string
 ||-- last: string
 |-- address: string
```

Parquet stores this table's data in three physical columns: `name.first`, 
`name.last` and `address`. To answer the query

```SQL
select address from contacts
```

Spark will read only from the `address` column of parquet data. However, to 
answer the query

```SQL
select name.first from contacts
```

Spark will read `name.first` and `name.last` from parquet.

This PR modifies Spark SQL to support a finer-grain of schema pruning. With 
this patch, Spark reads only the `name.first` column to answer the previous 
query.

### Implementation

There are three main components of this patch. First, there is a 
`ParquetSchemaPruning` optimizer rule for gathering the required schema fields 
of a `PhysicalOperation` over a parquet file, constructing a new schema based 
on those required fields and rewriting the plan in terms of that pruned schema. 
The pruned schema fields are pushed down to the parquet requested read schema. 
`ParquetSchemaPruning` uses a new `ProjectionOverSchema` extractor for 
rewriting a catalyst expression in terms of a pruned schema.

Second, the `ParquetRowConverter` has been patched to ensure the ordinals 
of the parquet columns read are correct for the pruned schema. 
`ParquetReadSupport` has been patched to address a compatibility mismatch 
between Spark's built in vectorized reader and the parquet-mr library's reader.

Third, we introduce two new catalyst query transformations, 
`AggregateFieldExtractionPushdown` and `JoinFieldExtractionPushdown`, to 
support schema pruning in aggregation and join query plans. These rules extract 
field references in aggregations and joins respectively, push down aliases to 
those references and replace them with references to the pushed down aliases. 
They use a new `SelectedField` extractor that transforms a catalyst complex 
type extractor (the "selected field") into a corresponding `StructField`.

### Performance

The performance difference in executing queries with this patch compared to 
master is related to the depth of the table schema and the query itself. At 
VideoAmp, one of our biggest tables stores OpenRTB bid requests we receive from 
our exchange partners. Our bid request table's schema closely follows the 
OpenRTB bid request object schema. Additionally, when we bid we save our 
response along with the request in the same table. We store these two objects 
as two top-level fields in our table. Therefore, all bid request and response 
data are contained within nested fields.

For the purposes of measuring the performance impact of this patch, we ran 
some queries on our bid request table with the un-patched and patched master. 
We measured query execution time and the amount of data read from the 
underlying parquet files. I'll focus on a couple of benchmarks. (All benchmarks 
were run on an AWS EC2 cluster with four c3.8xl workers.) The first query I'll 
highlight is

```SQL
select count(request.device.ip) from event.bid_request where ds=20161128 
and h=0
```

(Hopefully it's obvious what this query means.) On the un-patched master, 
this query ran in 2.7 minutes and read 34.3 GB of data. On the patched master, 
this query ran in 4 seconds and read 987.3 MB of data.

We also ran a reporting-oriented query benchmark. I won't reproduce the 
query here, but it reads a larger subset of the bid request fields and joins 
against another table with a deeply nested schema. In addition to a join, we 
perform several aggregations in this query. On the un-patched master, this 
query ran in 3.4 minutes and read 34.6 GB of data. On the patched master, this 
query ran in 59 seconds and read 2.6 GB of data.

### Limitation

Among the complex Spark SQL data types, this patch supports parquet column 
pruning of nested sequences of struct fields only.

## How was this patch tested?

Care has been taken to ensure correctness and prevent regressions. This 
patch introduces over two dozen new unit tests and has been running on a 
production Spark 1.5 cluster at VideoAmp for about a year. In that time, one