[GitHub] spark pull request #13701: [SPARK-15639][SQL] Try to push down filter at Row...

2016-08-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/13701#discussion_r73450133
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 ---
@@ -527,4 +538,54 @@ class ParquetFilterSuite extends QueryTest with 
ParquetTest with SharedSQLContex
   assert(df.filter("_1 IS NOT NULL").count() === 4)
 }
   }
+
+  test("Fiters should be pushed down for vectorized Parquet reader at row 
group level") {
+import testImplicits._
+withTempPath { dir =>
+  val path = s"${dir.getCanonicalPath}/table"
+  (1 to 1024).map(i => (101, i)).toDF("a", "b").write.parquet(path)
+
+  val requiredSchema = StructType(Seq(
+StructField("a", IntegerType, nullable = false),
+StructField("b", IntegerType, nullable = false)
+  ))
+
+  val hadoopConf = new Configuration()
+
+  // Set up parquet block size to make sure many row groups produced.
+  hadoopConf.setInt("parquet.block.size", 8)
+  hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, 
classOf[ParquetReadSupport].getName)
+  hadoopConf.set(
+ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
+ParquetSchemaConverter.checkFieldNames(requiredSchema).json)
+
+  val filters = ParquetFilters.createFilter(requiredSchema, 
sources.LessThan("a", 100))
+  assert(filters.isDefined)
+
+  val attemptId = new TaskAttemptID(new TaskID(new JobID(), 
TaskType.MAP, 0), 0)
+  val hadoopAttemptContext1 =
+new TaskAttemptContextImpl(hadoopConf, attemptId)
+  val hadoopAttemptContext2 =
+new TaskAttemptContextImpl(hadoopConf, attemptId)
+
+  
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext1.getConfiguration, 
filters.get)
--- End diff --

ok. Let me update it. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13701: [SPARK-15639][SQL] Try to push down filter at Row...

2016-08-03 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/13701#discussion_r73383148
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 ---
@@ -527,4 +538,54 @@ class ParquetFilterSuite extends QueryTest with 
ParquetTest with SharedSQLContex
   assert(df.filter("_1 IS NOT NULL").count() === 4)
 }
   }
+
+  test("Fiters should be pushed down for vectorized Parquet reader at row 
group level") {
+import testImplicits._
+withTempPath { dir =>
+  val path = s"${dir.getCanonicalPath}/table"
+  (1 to 1024).map(i => (101, i)).toDF("a", "b").write.parquet(path)
+
+  val requiredSchema = StructType(Seq(
+StructField("a", IntegerType, nullable = false),
+StructField("b", IntegerType, nullable = false)
+  ))
+
+  val hadoopConf = new Configuration()
+
+  // Set up parquet block size to make sure many row groups produced.
+  hadoopConf.setInt("parquet.block.size", 8)
+  hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, 
classOf[ParquetReadSupport].getName)
+  hadoopConf.set(
+ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
+ParquetSchemaConverter.checkFieldNames(requiredSchema).json)
+
+  val filters = ParquetFilters.createFilter(requiredSchema, 
sources.LessThan("a", 100))
+  assert(filters.isDefined)
+
+  val attemptId = new TaskAttemptID(new TaskID(new JobID(), 
TaskType.MAP, 0), 0)
+  val hadoopAttemptContext1 =
+new TaskAttemptContextImpl(hadoopConf, attemptId)
+  val hadoopAttemptContext2 =
+new TaskAttemptContextImpl(hadoopConf, attemptId)
+
+  
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext1.getConfiguration, 
filters.get)
--- End diff --

I think we should test the ParquetInputFormat to do the push down, 
otherwise we are testing parquet itself.

In order to known how many row groups are read, we could have a SQL metrics 
for that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13701: [SPARK-15639][SQL] Try to push down filter at Row...

2016-08-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/13701#discussion_r73334257
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 ---
@@ -527,4 +536,43 @@ class ParquetFilterSuite extends QueryTest with 
ParquetTest with SharedSQLContex
   assert(df.filter("_1 IS NOT NULL").count() === 4)
 }
   }
+
+  test("Fiters should be pushed down for vectorized Parquet reader at row 
group level") {
--- End diff --

Yea, as the ParquetRecordReader also uses the Configuration to get the 
pushed down filters, I think this also fixes SPARK-16321.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13701: [SPARK-15639][SQL] Try to push down filter at Row...

2016-08-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/13701#discussion_r7937
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 ---
@@ -527,4 +536,43 @@ class ParquetFilterSuite extends QueryTest with 
ParquetTest with SharedSQLContex
   assert(df.filter("_1 IS NOT NULL").count() === 4)
 }
   }
+
+  test("Fiters should be pushed down for vectorized Parquet reader at row 
group level") {
--- End diff --

We already have test for the pushed down filters for ParquetRecordReader. 
But it is at individual record level. If you mean row group level, because 
ParquetRecordReader doesn't expose a count for the row group, I think we can't 
check if the filter is pushed down. Besides, it seems to be the functionality 
of ParquetRecordReader, and I think it should be unit test in parquet project.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13701: [SPARK-15639][SQL] Try to push down filter at Row...

2016-08-03 Thread maver1ck
Github user maver1ck commented on a diff in the pull request:

https://github.com/apache/spark/pull/13701#discussion_r73304180
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 ---
@@ -527,4 +536,43 @@ class ParquetFilterSuite extends QueryTest with 
ParquetTest with SharedSQLContex
   assert(df.filter("_1 IS NOT NULL").count() === 4)
 }
   }
+
+  test("Fiters should be pushed down for vectorized Parquet reader at row 
group level") {
--- End diff --

@viirya 
I mean that we can also add test to check if we correctly push filter into 
ParquetRecordReader.
You know that you're also resolving SPARK-16321 
(https://github.com/apache/spark/pull/14465) ?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13701: [SPARK-15639][SQL] Try to push down filter at Row...

2016-08-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/13701#discussion_r73300059
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 ---
@@ -581,62 +586,6 @@ private[sql] object ParquetFileFormat extends Logging {
 }
   }
 
-  /** This closure sets various Parquet configurations at both driver side 
and executor side. */
-  private[parquet] def initializeLocalJobFunc(
--- End diff --

Looks safe to remove overrideMinSplitSize. So I also delete it in new 
commit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13701: [SPARK-15639][SQL] Try to push down filter at Row...

2016-08-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/13701#discussion_r73296603
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 ---
@@ -85,8 +85,15 @@ private[sql] object FileSourceStrategy extends Strategy 
with Logging {
 
ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
   logInfo(s"Pruning directories with: 
${partitionKeyFilters.mkString(",")}")
 
-  val dataColumns =
-l.resolve(fsRelation.dataSchema, 
fsRelation.sparkSession.sessionState.analyzer.resolver)
+  val dataColumns = l.resolve(fsRelation.dataSchema,
+fsRelation.sparkSession.sessionState.analyzer.resolver).map { c =>
+  fsRelation.dataSchema.find(_.name == c.name).map { f =>
+c match {
+  case a: AttributeReference => a.withMetadata(f.metadata)
+  case _ => c
+}
+  }.getOrElse(c)
+}
--- End diff --

When I add a regression test, I find that this change is not needed. Not 
sure which recent commit helps keeping metadata after resolving. I will add the 
regression test still.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13701: [SPARK-15639][SQL] Try to push down filter at Row...

2016-08-03 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/13701#discussion_r73295333
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 ---
@@ -527,4 +536,43 @@ class ParquetFilterSuite extends QueryTest with 
ParquetTest with SharedSQLContex
   assert(df.filter("_1 IS NOT NULL").count() === 4)
 }
   }
+
+  test("Fiters should be pushed down for vectorized Parquet reader at row 
group level") {
+import testImplicits._
+withTempPath { dir =>
+  val path = s"${dir.getCanonicalPath}/table"
+  (1 to 1024).map(i => (101, i)).toDF("a", "b").write.parquet(path)
+
+  val requiredSchema = StructType(Seq(
+StructField("a", IntegerType, nullable = false),
+StructField("b", IntegerType, nullable = false)
+  ))
+
+  val hadoopConf = new Configuration()
+  hadoopConf.setInt("parquet.block.size", 1)
--- End diff --

Ah, I see. Thanks. Maybe this should be commented.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13701: [SPARK-15639][SQL] Try to push down filter at Row...

2016-08-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/13701#discussion_r73295082
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 ---
@@ -527,4 +536,43 @@ class ParquetFilterSuite extends QueryTest with 
ParquetTest with SharedSQLContex
   assert(df.filter("_1 IS NOT NULL").count() === 4)
 }
   }
+
+  test("Fiters should be pushed down for vectorized Parquet reader at row 
group level") {
+import testImplicits._
+withTempPath { dir =>
+  val path = s"${dir.getCanonicalPath}/table"
+  (1 to 1024).map(i => (101, i)).toDF("a", "b").write.parquet(path)
+
+  val requiredSchema = StructType(Seq(
+StructField("a", IntegerType, nullable = false),
+StructField("b", IntegerType, nullable = false)
+  ))
+
+  val hadoopConf = new Configuration()
+  hadoopConf.setInt("parquet.block.size", 1)
--- End diff --

Just to make sure it produces many row groups.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13701: [SPARK-15639][SQL] Try to push down filter at Row...

2016-08-03 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/13701#discussion_r73293974
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 ---
@@ -527,4 +536,43 @@ class ParquetFilterSuite extends QueryTest with 
ParquetTest with SharedSQLContex
   assert(df.filter("_1 IS NOT NULL").count() === 4)
 }
   }
+
+  test("Fiters should be pushed down for vectorized Parquet reader at row 
group level") {
+import testImplicits._
+withTempPath { dir =>
+  val path = s"${dir.getCanonicalPath}/table"
+  (1 to 1024).map(i => (101, i)).toDF("a", "b").write.parquet(path)
+
+  val requiredSchema = StructType(Seq(
+StructField("a", IntegerType, nullable = false),
+StructField("b", IntegerType, nullable = false)
+  ))
+
+  val hadoopConf = new Configuration()
+  hadoopConf.setInt("parquet.block.size", 1)
--- End diff --

Do you mind if I ask why preferred row group size should be 1 byte? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13701: [SPARK-15639][SQL] Try to push down filter at Row...

2016-08-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/13701#discussion_r73293200
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 ---
@@ -527,4 +536,43 @@ class ParquetFilterSuite extends QueryTest with 
ParquetTest with SharedSQLContex
   assert(df.filter("_1 IS NOT NULL").count() === 4)
 }
   }
+
+  test("Fiters should be pushed down for vectorized Parquet reader at row 
group level") {
--- End diff --

For non-vectorized reader, we use parquet's `ParquetRecordReader` and push 
the filters into.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13701: [SPARK-15639][SQL] Try to push down filter at Row...

2016-08-03 Thread maver1ck
Github user maver1ck commented on a diff in the pull request:

https://github.com/apache/spark/pull/13701#discussion_r73290562
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 ---
@@ -527,4 +536,43 @@ class ParquetFilterSuite extends QueryTest with 
ParquetTest with SharedSQLContex
   assert(df.filter("_1 IS NOT NULL").count() === 4)
 }
   }
+
+  test("Fiters should be pushed down for vectorized Parquet reader at row 
group level") {
--- End diff --

What about non-vectorized reader ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13701: [SPARK-15639][SQL] Try to push down filter at Row...

2016-08-02 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/13701#discussion_r73259058
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 ---
@@ -581,62 +586,6 @@ private[sql] object ParquetFileFormat extends Logging {
 }
   }
 
-  /** This closure sets various Parquet configurations at both driver side 
and executor side. */
-  private[parquet] def initializeLocalJobFunc(
--- End diff --

Thanks for kind explanation!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13701: [SPARK-15639][SQL] Try to push down filter at Row...

2016-08-02 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/13701#discussion_r73258703
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 ---
@@ -581,62 +586,6 @@ private[sql] object ParquetFileFormat extends Logging {
 }
   }
 
-  /** This closure sets various Parquet configurations at both driver side 
and executor side. */
-  private[parquet] def initializeLocalJobFunc(
--- End diff --

I discussed this with @yhuai offline, because the default value of split 
size (128M) could match the default value of group size (128M) in parquet, so 
we will not regression in most the cases. It's true that user may still have a 
different value in HadoopConf, we did not respect that anymore, I think it's 
fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13701: [SPARK-15639][SQL] Try to push down filter at Row...

2016-08-02 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/13701#discussion_r73253577
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 ---
@@ -581,62 +586,6 @@ private[sql] object ParquetFileFormat extends Logging {
 }
   }
 
-  /** This closure sets various Parquet configurations at both driver side 
and executor side. */
-  private[parquet] def initializeLocalJobFunc(
--- End diff --

@davies Do you mind if I ask a question please? So, does removing 
overrideMinSplitSize mean  https://issues.apache.org/jira/browse/SPARK-10143 is 
not happening anymore? I thought this is a regression that we don't use this 
function.

I am pretty sure that you took a look for this as well but just want to 
double check and for my curiosity.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13701: [SPARK-15639][SQL] Try to push down filter at Row...

2016-08-02 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/13701#discussion_r73247832
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 ---
@@ -581,62 +586,6 @@ private[sql] object ParquetFileFormat extends Logging {
 }
   }
 
-  /** This closure sets various Parquet configurations at both driver side 
and executor side. */
-  private[parquet] def initializeLocalJobFunc(
--- End diff --

I looked at all these, seems safe to remove them (also 
overrideMinSplitSize).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13701: [SPARK-15639][SQL] Try to push down filter at Row...

2016-08-02 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/13701#discussion_r73247716
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 ---
@@ -85,8 +85,15 @@ private[sql] object FileSourceStrategy extends Strategy 
with Logging {
 
ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
   logInfo(s"Pruning directories with: 
${partitionKeyFilters.mkString(",")}")
 
-  val dataColumns =
-l.resolve(fsRelation.dataSchema, 
fsRelation.sparkSession.sessionState.analyzer.resolver)
+  val dataColumns = l.resolve(fsRelation.dataSchema,
+fsRelation.sparkSession.sessionState.analyzer.resolver).map { c =>
+  fsRelation.dataSchema.find(_.name == c.name).map { f =>
+c match {
+  case a: AttributeReference => a.withMetadata(f.metadata)
+  case _ => c
+}
+  }.getOrElse(c)
+}
--- End diff --

@viirya Should we fix that in `l.resolve()` ? cc @marmbrus 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13701: [SPARK-15639][SQL] Try to push down filter at Row...

2016-08-02 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/13701#discussion_r73207167
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 ---
@@ -85,8 +85,15 @@ private[sql] object FileSourceStrategy extends Strategy 
with Logging {
 
ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
   logInfo(s"Pruning directories with: 
${partitionKeyFilters.mkString(",")}")
 
-  val dataColumns =
-l.resolve(fsRelation.dataSchema, 
fsRelation.sparkSession.sessionState.analyzer.resolver)
+  val dataColumns = l.resolve(fsRelation.dataSchema,
+fsRelation.sparkSession.sessionState.analyzer.resolver).map { c =>
+  fsRelation.dataSchema.find(_.name == c.name).map { f =>
+c match {
+  case a: AttributeReference => a.withMetadata(f.metadata)
+  case _ => c
+}
+  }.getOrElse(c)
+}
--- End diff --

@viirya Could you add a regression test for this (it's easy to forget)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13701: [SPARK-15639][SQL] Try to push down filter at Row...

2016-08-02 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/13701#discussion_r73206857
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 ---
@@ -357,6 +357,11 @@ private[sql] class ParquetFileFormat
   val hadoopAttemptContext =
 new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, 
attemptId)
 
+  // Try to push down filters when filter push-down is enabled.
+  // Notice: This push-down is RowGroups level, not individual records.
+  pushed.foreach {
--- End diff --

Without knowing `pushed` is on Option, this make me think that it could be 
wrong. Since setFilterPredicate() should only be called once, it's better to be 
like this:
```
if (pushed.isDefined) {
 
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, 
pushed.get)
}
```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13701: [SPARK-15639][SQL] Try to push down filter at Row...

2016-06-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/13701#discussion_r68353312
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 ---
@@ -85,8 +85,15 @@ private[sql] object FileSourceStrategy extends Strategy 
with Logging {
 
ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
   logInfo(s"Pruning directories with: 
${partitionKeyFilters.mkString(",")}")
 
-  val dataColumns =
-l.resolve(fsRelation.dataSchema, 
fsRelation.sparkSession.sessionState.analyzer.resolver)
+  val dataColumns = l.resolve(fsRelation.dataSchema,
+fsRelation.sparkSession.sessionState.analyzer.resolver).map { c =>
+  fsRelation.dataSchema.find(_.name == c.name).map { f =>
+c match {
+  case a: AttributeReference => a.withMetadata(f.metadata)
+  case _ => c
+}
+  }.getOrElse(c)
+}
--- End diff --

We use metadata in merged schema to mark the optional field (not existing 
in all partitions), the metadata is lost after resolving. If we don't add them 
back, the pushed-down filters will be failed due to non existing field error.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13701: [SPARK-15639][SQL] Try to push down filter at Row...

2016-06-23 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/13701#discussion_r68352913
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 ---
@@ -85,8 +85,15 @@ private[sql] object FileSourceStrategy extends Strategy 
with Logging {
 
ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
   logInfo(s"Pruning directories with: 
${partitionKeyFilters.mkString(",")}")
 
-  val dataColumns =
-l.resolve(fsRelation.dataSchema, 
fsRelation.sparkSession.sessionState.analyzer.resolver)
+  val dataColumns = l.resolve(fsRelation.dataSchema,
+fsRelation.sparkSession.sessionState.analyzer.resolver).map { c =>
+  fsRelation.dataSchema.find(_.name == c.name).map { f =>
+c match {
+  case a: AttributeReference => a.withMetadata(f.metadata)
+  case _ => c
+}
+  }.getOrElse(c)
+}
--- End diff --

I guess a better question is if it is part of the bug fix?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13701: [SPARK-15639][SQL] Try to push down filter at Row...

2016-06-23 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/13701#discussion_r68352884
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 ---
@@ -85,8 +85,15 @@ private[sql] object FileSourceStrategy extends Strategy 
with Logging {
 
ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
   logInfo(s"Pruning directories with: 
${partitionKeyFilters.mkString(",")}")
 
-  val dataColumns =
-l.resolve(fsRelation.dataSchema, 
fsRelation.sparkSession.sessionState.analyzer.resolver)
+  val dataColumns = l.resolve(fsRelation.dataSchema,
+fsRelation.sparkSession.sessionState.analyzer.resolver).map { c =>
+  fsRelation.dataSchema.find(_.name == c.name).map { f =>
+c match {
+  case a: AttributeReference => a.withMetadata(f.metadata)
+  case _ => c
+}
+  }.getOrElse(c)
+}
--- End diff --

Do we need this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13701: [SPARK-15639][SQL] Try to push down filter at Row...

2016-06-15 Thread viirya
GitHub user viirya opened a pull request:

https://github.com/apache/spark/pull/13701

 [SPARK-15639][SQL] Try to push down filter at RowGroups level for parquet 
reader

## What changes were proposed in this pull request?

The base class `SpecificParquetRecordReaderBase` used for vectorized 
parquet reader will try to get pushed-down filters from the given 
configuration. This pushed-down filters are used for RowGroups-level filtering. 
However, we don't set up the filters to push down into the configuration. In 
other words, the filters are not actually pushed down to do RowGroups-level 
filtering. This patch is to fix this and tries to set up the filters for 
pushing down to configuration for the reader.

The benchmark that excludes the time of writing Parquet file:

test("Benchmark for Parquet") {
  val N = 1 << 50
withParquetTable((0 until N).map(i => (101, i)), "t") {
  val benchmark = new Benchmark("Parquet reader", N)
  benchmark.addCase("reading Parquet file", 10) { iter =>
sql("SELECT _1 FROM t where t._1 < 100").collect()
  }
  benchmark.run()
  }
}

`withParquetTable` in default will run tests for vectorized reader 
non-vectorized readers. I only let it run vectorized reader.

After this patch:

Java HotSpot(TM) 64-Bit Server VM 1.8.0_25-b17 on Linux 
3.13.0-57-generic
Westmere E56xx/L56xx/X56xx (Nehalem-C)
Parquet reader:  Best/Avg Time(ms)Rate(M/s) 
  Per Row(ns)   Relative


reading Parquet file76 /   88  3.4  
   291.0   1.0X

Before this patch:

Java HotSpot(TM) 64-Bit Server VM 1.8.0_25-b17 on Linux 
3.13.0-57-generic
Westmere E56xx/L56xx/X56xx (Nehalem-C)
Parquet reader:  Best/Avg Time(ms)Rate(M/s) 
  Per Row(ns)   Relative


reading Parquet file81 /   91  3.2  
   310.2   1.0X

Next, I run the benchmark for non-pushdown case using the same benchmark 
code but with disabled pushdown configuration.

After this patch:

Parquet reader:  Best/Avg Time(ms)Rate(M/s) 
  Per Row(ns)   Relative


reading Parquet file80 /   95  3.3  
   306.5   1.0X

Before this patch:

Parquet reader:  Best/Avg Time(ms)Rate(M/s) 
  Per Row(ns)   Relative


reading Parquet file80 /  103  3.3  
   306.7   1.0X

For non-pushdown case, from the results, I think this patch doesn't affect 
normal code path.

I've manually output the `totalRowCount` in 
`SpecificParquetRecordReaderBase` to see if this patch actually filter the 
row-groups. When running the above benchmark:

After this patch:
`totalRowCount = 0`

Before this patch:
`totalRowCount = 131072`


## How was this patch tested?
Existing tests should be passed.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/viirya/spark-1 
vectorized-reader-push-down-filter2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/13701.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #13701


commit 5687a3b5527817c809244305468bfe4968bedcec
Author: Liang-Chi Hsieh 
Date:   2016-05-28T05:03:06Z

Try to push down filter at RowGroups level for parquet reader.

commit 077f7f8813a76d38c8a6d898ec54e401c91b6014
Author: Liang-Chi Hsieh 
Date:   2016-06-09T21:19:47Z

Merge remote-tracking branch 'upstream/master' into 
vectorized-reader-push-down-filter

Conflicts:

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala

commit 97ccacfca1f7a039bc7bf7b8a4f8f975deb70197
Author: Liang-Chi Hsieh 
Date:   2016-06-14T07:22:53Z

Merge remote-tracking branch 'upstream/master' into 
vectorized-reader-push-down-filter




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature