[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/14690


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-14 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r83518291
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala ---
@@ -626,6 +627,40 @@ private[spark] class HiveExternalCatalog(conf: 
SparkConf, hadoopConf: Configurat
 client.getPartition(db, table, spec)
   }
 
+  override def listPartitionsByFilter(
--- End diff --

> Why can't we just call client.getPartitionsByFilter in this method and 
put all other logic at caller side?

There are two reasons:

1. This method is intended to return exactly the partitions matching the 
given filter.
2. This method is called from multiple places. I feel this is an acceptable 
place to centralize this common logic.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-14 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r83517834
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
 ---
@@ -17,32 +17,26 @@
 
 package org.apache.spark.sql.execution.datasources
 
-import java.io.FileNotFoundException
-
 import scala.collection.mutable
 
-import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs._
-import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
 
-import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.SerializableConfiguration
 
 
 /**
  * A [[FileCatalog]] that generates the list of files to process by 
recursively listing all the
  * files present in `paths`.
  *
+ * @param rootPaths the list of root table paths to scan
  * @param parameters as set of options to control discovery
- * @param paths a list of paths to scan
  * @param partitionSchema an optional partition schema that will be use to 
provide types for the
  *discovered partitions
  */
 class ListingFileCatalog(
 sparkSession: SparkSession,
-override val paths: Seq[Path],
--- End diff --

In earlier versions of this PR, these paths were renamed to `rootPaths` to 
indicate that they were only the filesystem "roots" for this table—no 
partition directories. This restriction was relaxed in a later iteration of the 
PR to address a shortcoming of the new architecture.

The name `paths` probably makes more sense now, but I'm reluctant to push 
any non-critical commits right now. The last rebase was a killer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-13 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r83356659
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala ---
@@ -626,6 +627,40 @@ private[spark] class HiveExternalCatalog(conf: 
SparkConf, hadoopConf: Configurat
 client.getPartition(db, table, spec)
   }
 
+  override def listPartitionsByFilter(
--- End diff --

Well, `filtering the partition values again` seems a hive specific logic, 
as hive metastore may not support partition pruning and still return all 
partitions. LGTM then


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-13 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r83355030
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala ---
@@ -34,7 +34,7 @@ import org.apache.spark.util.Utils
 // The data where the partitioning key exists only in the directory 
structure.
 case class ParquetData(intField: Int, stringField: String)
 // The data that also includes the partitioning key
-case class ParquetDataWithKey(p: Int, intField: Int, stringField: String)
+case class ParquetDataWithKey(pQ: Int, intField: Int, stringField: String)
--- End diff --

The modified unit tests did surface a bug, but it's tricky in how it 
manifested itself. Very briefly, when adding missing partitions for partition 
columns with an upper-case letter `msck repair table` works, but `alter table 
... add partition` does not. The latter actually creates new directories. For 
example, here's a partitioned table listing before calling `add partition`:

```
-rw-r--r--   3 msa msa  0 2016-10-14 03:52 test_mixed_case/_SUCCESS
drwxr-xr-x   - msa msa  0 2016-10-14 03:52 test_mixed_case/partCol=0
drwxr-xr-x   - msa msa  0 2016-10-14 03:52 test_mixed_case/partCol=1
drwxr-xr-x   - msa msa  0 2016-10-14 03:52 test_mixed_case/partCol=2
drwxr-xr-x   - msa msa  0 2016-10-14 03:52 test_mixed_case/partCol=3
drwxr-xr-x   - msa msa  0 2016-10-14 03:52 test_mixed_case/partCol=4
drwxr-xr-x   - msa msa  0 2016-10-14 03:52 test_mixed_case/partCol=5
drwxr-xr-x   - msa msa  0 2016-10-14 03:52 test_mixed_case/partCol=6
drwxr-xr-x   - msa msa  0 2016-10-14 03:52 test_mixed_case/partCol=7
drwxr-xr-x   - msa msa  0 2016-10-14 03:52 test_mixed_case/partCol=8
drwxr-xr-x   - msa msa  0 2016-10-14 03:52 test_mixed_case/partCol=9
```

Here's the listing after calling `add partition` for each partition in turn:

```
-rw-r--r--   3 msa msa  0 2016-10-14 03:52 test_mixed_case/_SUCCESS
drwxr-xr-x   - msa msa  0 2016-10-14 03:52 test_mixed_case/partCol=0
drwxr-xr-x   - msa msa  0 2016-10-14 03:52 test_mixed_case/partCol=1
drwxr-xr-x   - msa msa  0 2016-10-14 03:52 test_mixed_case/partCol=2
drwxr-xr-x   - msa msa  0 2016-10-14 03:52 test_mixed_case/partCol=3
drwxr-xr-x   - msa msa  0 2016-10-14 03:52 test_mixed_case/partCol=4
drwxr-xr-x   - msa msa  0 2016-10-14 03:52 test_mixed_case/partCol=5
drwxr-xr-x   - msa msa  0 2016-10-14 03:52 test_mixed_case/partCol=6
drwxr-xr-x   - msa msa  0 2016-10-14 03:52 test_mixed_case/partCol=7
drwxr-xr-x   - msa msa  0 2016-10-14 03:52 test_mixed_case/partCol=8
drwxr-xr-x   - msa msa  0 2016-10-14 03:52 test_mixed_case/partCol=9
drwxr-xr-x   - msa msa  0 2016-10-14 03:53 test_mixed_case/partcol=0
drwxr-xr-x   - msa msa  0 2016-10-14 03:53 test_mixed_case/partcol=1
drwxr-xr-x   - msa msa  0 2016-10-14 03:53 test_mixed_case/partcol=2
drwxr-xr-x   - msa msa  0 2016-10-14 03:53 test_mixed_case/partcol=3
drwxr-xr-x   - msa msa  0 2016-10-14 03:53 test_mixed_case/partcol=4
drwxr-xr-x   - msa msa  0 2016-10-14 03:53 test_mixed_case/partcol=5
drwxr-xr-x   - msa msa  0 2016-10-14 03:53 test_mixed_case/partcol=6
drwxr-xr-x   - msa msa  0 2016-10-14 03:53 test_mixed_case/partcol=7
drwxr-xr-x   - msa msa  0 2016-10-14 03:53 test_mixed_case/partcol=8
drwxr-xr-x   - msa msa  0 2016-10-14 03:53 test_mixed_case/partcol=9
```

I'll file an issue when I have a moment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-13 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r83352979
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala ---
@@ -626,6 +627,40 @@ private[spark] class HiveExternalCatalog(conf: 
SparkConf, hadoopConf: Configurat
 client.getPartition(db, table, spec)
   }
 
+  override def listPartitionsByFilter(
--- End diff --

We probably could, but why?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-13 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r83352838
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala ---
@@ -626,6 +627,40 @@ private[spark] class HiveExternalCatalog(conf: 
SparkConf, hadoopConf: Configurat
 client.getPartition(db, table, spec)
   }
 
+  override def listPartitionsByFilter(
--- End diff --

Why can't we just call `client.getPartitionsByFilter` in this method and 
put all other logic at caller side?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-13 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r83349072
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala ---
@@ -34,7 +34,7 @@ import org.apache.spark.util.Utils
 // The data where the partitioning key exists only in the directory 
structure.
 case class ParquetData(intField: Int, stringField: String)
 // The data that also includes the partitioning key
-case class ParquetDataWithKey(p: Int, intField: Int, stringField: String)
+case class ParquetDataWithKey(pQ: Int, intField: Int, stringField: String)
--- End diff --

Ok. I'll report an issue if I find a problem.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-13 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r83348167
  
--- Diff: 
core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala ---
@@ -60,3 +60,32 @@ object CodegenMetrics extends Source {
   val METRIC_GENERATED_METHOD_BYTECODE_SIZE =
 metricRegistry.histogram(MetricRegistry.name("generatedMethodSize"))
 }
+
+/**
+ * :: Experimental ::
+ * Metrics for access to the hive external catalog.
+ */
+@Experimental
+object HiveCatalogMetrics extends Source {
+  override val sourceName: String = "HiveExternalCatalog"
+  override val metricRegistry: MetricRegistry = new MetricRegistry()
+
+  /**
+   * Tracks the total number of partition metadata entries fetched via the 
client api.
+   */
+  val METRIC_PARTITIONS_FETCHED = 
metricRegistry.counter(MetricRegistry.name("partitionsFetched"))
+
+  /**
+   * Tracks the total number of files discovered off of the filesystem by 
ListingFileCatalog.
+   */
+  val METRIC_FILES_DISCOVERED = 
metricRegistry.counter(MetricRegistry.name("filesDiscovered"))
+
+  def reset(): Unit = {
+METRIC_PARTITIONS_FETCHED.dec(METRIC_PARTITIONS_FETCHED.getCount())
+METRIC_FILES_DISCOVERED.dec(METRIC_FILES_DISCOVERED.getCount())
+  }
+
+  // clients can use these to avoid classloader issues with the codahale 
classes
--- End diff --

I don't quite understand the issue, but if you reference the Counter object 
directly from the caller sites then you get

```
[info] Exception encountered when attempting to run a suite with class 
name: org.apache.spark.sql.hive.HiveDataFrameSuite *** ABORTED *** (12 seconds, 
51 milliseconds)
[info]   java.lang.LinkageError: loader constraint violation: loader 
(instance of org/apache/spark/sql/hive/client/IsolatedClientLoader$$anon$1) 
previously initiated loading for a different type with name 
"com/codahale/metrics/Counter"
[info]   at java.lang.ClassLoader.defineClass1(Native Method)
[info]   at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
[info]   at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
[info]   at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-13 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r83348200
  
--- Diff: 
core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala ---
@@ -60,3 +60,32 @@ object CodegenMetrics extends Source {
   val METRIC_GENERATED_METHOD_BYTECODE_SIZE =
 metricRegistry.histogram(MetricRegistry.name("generatedMethodSize"))
 }
+
+/**
+ * :: Experimental ::
+ * Metrics for access to the hive external catalog.
+ */
+@Experimental
+object HiveCatalogMetrics extends Source {
+  override val sourceName: String = "HiveExternalCatalog"
+  override val metricRegistry: MetricRegistry = new MetricRegistry()
+
+  /**
+   * Tracks the total number of partition metadata entries fetched via the 
client api.
+   */
+  val METRIC_PARTITIONS_FETCHED = 
metricRegistry.counter(MetricRegistry.name("partitionsFetched"))
+
+  /**
+   * Tracks the total number of files discovered off of the filesystem by 
ListingFileCatalog.
+   */
+  val METRIC_FILES_DISCOVERED = 
metricRegistry.counter(MetricRegistry.name("filesDiscovered"))
+
+  def reset(): Unit = {
+METRIC_PARTITIONS_FETCHED.dec(METRIC_PARTITIONS_FETCHED.getCount())
+METRIC_FILES_DISCOVERED.dec(METRIC_FILES_DISCOVERED.getCount())
+  }
+
+  // clients can use these to avoid classloader issues with the codahale 
classes
--- End diff --

Hm, maybe this is a load order issue


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-13 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r83348022
  
--- Diff: 
core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala ---
@@ -60,3 +60,32 @@ object CodegenMetrics extends Source {
   val METRIC_GENERATED_METHOD_BYTECODE_SIZE =
 metricRegistry.histogram(MetricRegistry.name("generatedMethodSize"))
 }
+
+/**
+ * :: Experimental ::
+ * Metrics for access to the hive external catalog.
+ */
+@Experimental
+object HiveCatalogMetrics extends Source {
+  override val sourceName: String = "HiveExternalCatalog"
+  override val metricRegistry: MetricRegistry = new MetricRegistry()
+
+  /**
+   * Tracks the total number of partition metadata entries fetched via the 
client api.
+   */
+  val METRIC_PARTITIONS_FETCHED = 
metricRegistry.counter(MetricRegistry.name("partitionsFetched"))
+
+  /**
+   * Tracks the total number of files discovered off of the filesystem by 
ListingFileCatalog.
+   */
+  val METRIC_FILES_DISCOVERED = 
metricRegistry.counter(MetricRegistry.name("filesDiscovered"))
+
+  def reset(): Unit = {
+METRIC_PARTITIONS_FETCHED.dec(METRIC_PARTITIONS_FETCHED.getCount())
+METRIC_FILES_DISCOVERED.dec(METRIC_FILES_DISCOVERED.getCount())
+  }
+
+  // clients can use these to avoid classloader issues with the codahale 
classes
--- End diff --

I don't quite understand this comment. what issue do this 2 method address?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-13 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r83347741
  
--- Diff: 
core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala ---
@@ -60,3 +60,32 @@ object CodegenMetrics extends Source {
   val METRIC_GENERATED_METHOD_BYTECODE_SIZE =
 metricRegistry.histogram(MetricRegistry.name("generatedMethodSize"))
 }
+
+/**
+ * :: Experimental ::
+ * Metrics for access to the hive external catalog.
+ */
+@Experimental
+object HiveCatalogMetrics extends Source {
--- End diff --

Ah, the reason you can't is because to register this source it needs to be 
in the list above. This made me realize I forgot to add it to the list 
actually: https://github.com/VideoAmp/spark-public/pull/6


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-13 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r83347636
  
--- Diff: 
core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala ---
@@ -60,3 +60,32 @@ object CodegenMetrics extends Source {
   val METRIC_GENERATED_METHOD_BYTECODE_SIZE =
 metricRegistry.histogram(MetricRegistry.name("generatedMethodSize"))
 }
+
+/**
+ * :: Experimental ::
+ * Metrics for access to the hive external catalog.
+ */
+@Experimental
+object HiveCatalogMetrics extends Source {
+  override val sourceName: String = "HiveExternalCatalog"
+  override val metricRegistry: MetricRegistry = new MetricRegistry()
+
+  /**
+   * Tracks the total number of partition metadata entries fetched via the 
client api.
+   */
+  val METRIC_PARTITIONS_FETCHED = 
metricRegistry.counter(MetricRegistry.name("partitionsFetched"))
+
+  /**
+   * Tracks the total number of files discovered off of the filesystem by 
ListingFileCatalog.
+   */
+  val METRIC_FILES_DISCOVERED = 
metricRegistry.counter(MetricRegistry.name("filesDiscovered"))
+
+  def reset(): Unit = {
--- End diff --

should we mention that this is for testing only?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-13 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r83347251
  
--- Diff: 
core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala ---
@@ -60,3 +60,32 @@ object CodegenMetrics extends Source {
   val METRIC_GENERATED_METHOD_BYTECODE_SIZE =
 metricRegistry.histogram(MetricRegistry.name("generatedMethodSize"))
 }
+
+/**
+ * :: Experimental ::
+ * Metrics for access to the hive external catalog.
+ */
+@Experimental
+object HiveCatalogMetrics extends Source {
--- End diff --

should we move it to sql module?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-13 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r83342839
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala ---
@@ -34,7 +34,7 @@ import org.apache.spark.util.Utils
 // The data where the partitioning key exists only in the directory 
structure.
 case class ParquetData(intField: Int, stringField: String)
 // The data that also includes the partitioning key
-case class ParquetDataWithKey(p: Int, intField: Int, stringField: String)
+case class ParquetDataWithKey(pQ: Int, intField: Int, stringField: String)
--- End diff --

Can you revert these changes? I think we are close to something mergeable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-13 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r83341181
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala ---
@@ -616,6 +617,44 @@ private[spark] class HiveExternalCatalog(conf: 
SparkConf, hadoopConf: Configurat
 client.getPartition(db, table, spec)
   }
 
+  override def listPartitionsByFilter(
+  db: String,
+  table: String,
+  predicates: Seq[Expression]): Seq[CatalogTablePartition] = 
withClient {
+val catalogTable = client.getTable(db, table)
+val partitionColumnNames = catalogTable.partitionColumnNames.toSet
+val nonPartitionPruningPredicates = predicates.filterNot {
+  _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
+}
+
+if (nonPartitionPruningPredicates.nonEmpty) {
+sys.error("Expected only partition pruning predicates: " +
+  predicates.reduceLeft(And))
+}
+
+val partitionSchema = catalogTable.partitionSchema
+
+if (predicates.nonEmpty) {
+  val clientPrunedPartitions =
+client.getPartitionsByFilter(catalogTable, predicates)
+  val boundPredicate =
+InterpretedPredicate.create(predicates.reduce(And).transform {
+  case att: AttributeReference =>
+val index = partitionSchema.indexWhere(_.name == att.name)
--- End diff --

I'm on linux, which should be case-sensitive? Were you able to reproduce 
with the snippet I tried locally?

Anyways, we should probably address this issue in another ticket.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-13 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r83340821
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -2602,7 +2602,7 @@ class Dataset[T] private[sql](
* @since 2.0.0
*/
   def inputFiles: Array[String] = {
-val files: Seq[String] = logicalPlan.collect {
+val files: Seq[String] = queryExecution.optimizedPlan.collect {
--- End diff --

We only determine the partitions read after optimization, so it's necessary 
to read it from that instead of the logical plan.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-13 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r83340736
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -2602,7 +2602,7 @@ class Dataset[T] private[sql](
* @since 2.0.0
*/
   def inputFiles: Array[String] = {
-val files: Seq[String] = logicalPlan.collect {
+val files: Seq[String] = queryExecution.optimizedPlan.collect {
--- End diff --

why this change?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-13 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r83340504
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
 ---
@@ -477,6 +478,15 @@ class InMemoryCatalog(
 catalog(db).tables(table).partitions.values.toSeq
   }
 
+  override def listPartitionsByFilter(
+  db: String,
+  table: String,
+  predicates: Seq[Expression]): Seq[CatalogTablePartition] = {
+// TODO: Provide an implementation
+throw new UnsupportedOperationException(
+  "listPartitionsByFilter is not implemented")
--- End diff --

because not all versions of hive metastore support partition pruning, so in 
our code we expect `listPartitionsByFilter` may still return all partitions. 
But anyway, throwing exception here is least surprising.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-13 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r83325529
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -225,13 +225,19 @@ case class FileSourceScanExec(
   }
 
   // These metadata values make scan plans uniquely identifiable for 
equality checking.
-  override val metadata: Map[String, String] = Map(
-"Format" -> relation.fileFormat.toString,
-"ReadSchema" -> outputSchema.catalogString,
-"Batched" -> supportsBatch.toString,
-"PartitionFilters" -> partitionFilters.mkString("[", ", ", "]"),
-"PushedFilters" -> dataFilters.mkString("[", ", ", "]"),
-"InputPaths" -> relation.location.paths.mkString(", "))
+  override val metadata: Map[String, String] = {
+def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]")
+val location = relation.location
+val locationDesc =
+  location.getClass.getSimpleName + seqToString(location.rootPaths)
--- End diff --

This style emulates the way relations are shown in a query plan, e.g.

`Relation[a#1, b#2]`

I'm in favor of keeping this as-is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-13 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r83325088
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
 ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+
+private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
+case op @ PhysicalOperation(projects, filters,
+logicalRelation @
+  LogicalRelation(fsRelation @
+HadoopFsRelation(
+  tableFileCatalog: TableFileCatalog,
+  partitionSchema,
+  _,
+  _,
+  _,
+  _),
+_,
+_))
+if filters.nonEmpty && fsRelation.partitionSchemaOption.isDefined 
=>
+  // The attribute name of predicate could be different than the one 
in schema in case of
+  // case insensitive, we should change them to match the one in 
schema, so we donot need to
+  // worry about case sensitivity anymore.
+  val normalizedFilters = filters.map { e =>
+e transform {
+  case a: AttributeReference =>
+
a.withName(logicalRelation.output.find(_.semanticEquals(a)).get.name)
+}
+  }
+
+  val sparkSession = fsRelation.sparkSession
+  val partitionColumns =
+logicalRelation.resolve(
+  partitionSchema, sparkSession.sessionState.analyzer.resolver)
+  val partitionSet = AttributeSet(partitionColumns)
+  val partitionKeyFilters =
+
ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
+
+  if (partitionKeyFilters.nonEmpty) {
+  val prunedFileCatalog = 
tableFileCatalog.filterPartitions(partitionKeyFilters.toSeq)
+  val prunedFsRelation =
+fsRelation.copy(location = prunedFileCatalog)(sparkSession)
+  val prunedLogicalRelation = logicalRelation.copy(relation = 
prunedFsRelation)
+
+  // Keep partition-pruning predicates so that they are visible in 
physical planning
+  val filterExpression = filters.reduceLeft(And)
--- End diff --

I pushed a commit to show partition count for partitioned tables.

I also did some informal A/B perf testing keeping and omitting the 
partition pruning filters. I saw no discernible performance difference.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-13 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r83318096
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala ---
@@ -616,6 +617,44 @@ private[spark] class HiveExternalCatalog(conf: 
SparkConf, hadoopConf: Configurat
 client.getPartition(db, table, spec)
   }
 
+  override def listPartitionsByFilter(
+  db: String,
+  table: String,
+  predicates: Seq[Expression]): Seq[CatalogTablePartition] = 
withClient {
+val catalogTable = client.getTable(db, table)
+val partitionColumnNames = catalogTable.partitionColumnNames.toSet
+val nonPartitionPruningPredicates = predicates.filterNot {
+  _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
+}
+
+if (nonPartitionPruningPredicates.nonEmpty) {
+sys.error("Expected only partition pruning predicates: " +
+  predicates.reduceLeft(And))
+}
+
+val partitionSchema = catalogTable.partitionSchema
+
+if (predicates.nonEmpty) {
+  val clientPrunedPartitions =
+client.getPartitionsByFilter(catalogTable, predicates)
+  val boundPredicate =
+InterpretedPredicate.create(predicates.reduce(And).transform {
+  case att: AttributeReference =>
+val index = partitionSchema.indexWhere(_.name == att.name)
--- End diff --

> I think there is an issue in the modified test, since the 
ParquetDataWithKey class still has single-p as a field. That would cause false 
positive failures.
>
> I tried locally to reproduce a mixed-case issue, with no success, e.g.

BTW, are you testing with a case-sensitive filesystem? My revised 
`ParquetMetastoreSuite` tests passed for me on a case-insensitive filesystem. I 
switched to a case-sensitive filesystem and they fail.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-13 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r83302513
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -230,13 +230,21 @@ case class FileSourceScanExec(
 val location = relation.location
 val locationDesc =
   location.getClass.getSimpleName + seqToString(location.rootPaths)
-Map(
-  "Format" -> relation.fileFormat.toString,
-  "ReadSchema" -> outputSchema.catalogString,
-  "Batched" -> supportsBatch.toString,
-  "PartitionFilters" -> seqToString(partitionFilters),
-  "PushedFilters" -> seqToString(dataFilters),
-  "Location" -> locationDesc)
+val metadata =
+  Map(
+"Format" -> relation.fileFormat.toString,
+"ReadSchema" -> outputSchema.catalogString,
+"Batched" -> supportsBatch.toString,
+"PartitionFilters" -> seqToString(partitionFilters),
+"PushedFilters" -> seqToString(dataFilters),
+"Location" -> locationDesc)
+val withOptPartitionCount =
+  relation.partitionSchemaOption.map { _ =>
+metadata + ("PartitionCount" -> selectedPartitions.size.toString)
--- End diff --

+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-12 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r83141382
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala ---
@@ -616,6 +617,44 @@ private[spark] class HiveExternalCatalog(conf: 
SparkConf, hadoopConf: Configurat
 client.getPartition(db, table, spec)
   }
 
+  override def listPartitionsByFilter(
+  db: String,
+  table: String,
+  predicates: Seq[Expression]): Seq[CatalogTablePartition] = 
withClient {
+val catalogTable = client.getTable(db, table)
+val partitionColumnNames = catalogTable.partitionColumnNames.toSet
+val nonPartitionPruningPredicates = predicates.filterNot {
+  _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
+}
+
+if (nonPartitionPruningPredicates.nonEmpty) {
+sys.error("Expected only partition pruning predicates: " +
+  predicates.reduceLeft(And))
+}
+
+val partitionSchema = catalogTable.partitionSchema
+
+if (predicates.nonEmpty) {
+  val clientPrunedPartitions =
+client.getPartitionsByFilter(catalogTable, predicates)
+  val boundPredicate =
+InterpretedPredicate.create(predicates.reduce(And).transform {
+  case att: AttributeReference =>
+val index = partitionSchema.indexWhere(_.name == att.name)
--- End diff --

I tested this with unit tests from two test suites on two branches. The 
first test suite was `SQLQuerySuite` from the Hive codebase, specifically the 
test "SPARK-10562: partition by column with mixed case name". The second test 
suite was (a modified) `ParquetMetastoreSuite`. I modified the name of the 
partition column in the partitioned tables in the latter suite from `p` to 
`pQ`. The two branches on which I tested were this PR and commit 8d33e1e from 
the master branch.

The first test suite passed on both branches. I guess that's to be expected 
since our Jenkins bot has been reporting it as passed.

The second suite failed (as modified) on both branches. In both branches, 
Spark SQL failed to find the partitions on-disk. This makes me wonder:

1. Is this a known/accepted limitation?
1. If unknown, is this an acceptable limitation or a bug to be fixed?

The best I found regarding support for mixed-case partition columns was in 
https://issues.apache.org/jira/browse/SPARK-10562. Unlike in the first test 
(which uses the `saveAsTable` method), the tables in `ParquetMetastoreSuite` 
are built with SQL DDL.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-12 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r83131630
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
@@ -199,59 +197,30 @@ private[hive] class 
HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
 
 val result = if (metastoreRelation.hiveQlTable.isPartitioned) {
   val partitionSchema = 
StructType.fromAttributes(metastoreRelation.partitionKeys)
-  val partitionColumnDataTypes = partitionSchema.map(_.dataType)
-  // We're converting the entire table into HadoopFsRelation, so 
predicates to Hive metastore
-  // are empty.
-  val partitions = metastoreRelation.getHiveQlPartitions().map { p =>
-val location = p.getLocation
-val values = 
InternalRow.fromSeq(p.getValues.asScala.zip(partitionColumnDataTypes).map {
-  case (rawValue, dataType) => Cast(Literal(rawValue), 
dataType).eval(null)
-})
-PartitionDirectory(values, location)
-  }
-  val partitionSpec = PartitionSpec(partitionSchema, partitions)
--- End diff --

Ok, thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-12 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r83128973
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
@@ -199,59 +197,30 @@ private[hive] class 
HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
 
 val result = if (metastoreRelation.hiveQlTable.isPartitioned) {
   val partitionSchema = 
StructType.fromAttributes(metastoreRelation.partitionKeys)
-  val partitionColumnDataTypes = partitionSchema.map(_.dataType)
-  // We're converting the entire table into HadoopFsRelation, so 
predicates to Hive metastore
-  // are empty.
-  val partitions = metastoreRelation.getHiveQlPartitions().map { p =>
-val location = p.getLocation
-val values = 
InternalRow.fromSeq(p.getValues.asScala.zip(partitionColumnDataTypes).map {
-  case (rawValue, dataType) => Cast(Literal(rawValue), 
dataType).eval(null)
-})
-PartitionDirectory(values, location)
-  }
-  val partitionSpec = PartitionSpec(partitionSchema, partitions)
--- End diff --

Nope, that should be good to go.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-12 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r83115827
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
@@ -199,59 +197,30 @@ private[hive] class 
HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
 
 val result = if (metastoreRelation.hiveQlTable.isPartitioned) {
   val partitionSchema = 
StructType.fromAttributes(metastoreRelation.partitionKeys)
-  val partitionColumnDataTypes = partitionSchema.map(_.dataType)
-  // We're converting the entire table into HadoopFsRelation, so 
predicates to Hive metastore
-  // are empty.
-  val partitions = metastoreRelation.getHiveQlPartitions().map { p =>
-val location = p.getLocation
-val values = 
InternalRow.fromSeq(p.getValues.asScala.zip(partitionColumnDataTypes).map {
-  case (rawValue, dataType) => Cast(Literal(rawValue), 
dataType).eval(null)
-})
-PartitionDirectory(values, location)
-  }
-  val partitionSpec = PartitionSpec(partitionSchema, partitions)
--- End diff --

Does this invalidate any of the work you've done in 
https://github.com/VideoAmp/spark-public/pull/3?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-12 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r83106495
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
 ---
@@ -55,10 +55,16 @@ class TableFileCatalog(
 
   override def listFiles(filters: Seq[Expression]): Seq[Partition] = 
partitionSchema match {
 case Some(partitionSchema) =>
-  externalCatalog.listPartitionsByFilter(db, table, filters).flatMap {
+  val catalogTablePartitions = 
externalCatalog.listPartitionsByFilter(db, table, filters)
--- End diff --

Actually, this breaks inputFiles as well when there are mixed partition 
paths as in "SPARK-15248: explicitly added partitions should be readable".

I'm going to look into this more.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-12 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r83086945
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
 ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+
+private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
+case op @ PhysicalOperation(projects, filters,
+logicalRelation @
+  LogicalRelation(fsRelation @
+HadoopFsRelation(
+  tableFileCatalog: TableFileCatalog,
+  partitionSchema,
+  _,
+  _,
+  _,
+  _),
+_,
+_))
+if filters.nonEmpty && fsRelation.partitionSchemaOption.isDefined 
=>
+  // The attribute name of predicate could be different than the one 
in schema in case of
+  // case insensitive, we should change them to match the one in 
schema, so we donot need to
+  // worry about case sensitivity anymore.
+  val normalizedFilters = filters.map { e =>
+e transform {
+  case a: AttributeReference =>
+
a.withName(logicalRelation.output.find(_.semanticEquals(a)).get.name)
+}
+  }
+
+  val sparkSession = fsRelation.sparkSession
+  val partitionColumns =
+logicalRelation.resolve(
+  partitionSchema, sparkSession.sessionState.analyzer.resolver)
+  val partitionSet = AttributeSet(partitionColumns)
+  val partitionKeyFilters =
+
ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
+
+  if (partitionKeyFilters.nonEmpty) {
+  val prunedFileCatalog = 
tableFileCatalog.filterPartitions(partitionKeyFilters.toSeq)
+  val prunedFsRelation =
+fsRelation.copy(location = prunedFileCatalog)(sparkSession)
+  val prunedLogicalRelation = logicalRelation.copy(relation = 
prunedFsRelation)
+
+  // Keep partition-pruning predicates so that they are visible in 
physical planning
+  val filterExpression = filters.reduceLeft(And)
--- End diff --

Also, I'll try some A/B testing to examine the performance impact of 
removing these partition pruning filters here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-12 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r83085289
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala ---
@@ -616,6 +617,44 @@ private[spark] class HiveExternalCatalog(conf: 
SparkConf, hadoopConf: Configurat
 client.getPartition(db, table, spec)
   }
 
+  override def listPartitionsByFilter(
+  db: String,
+  table: String,
+  predicates: Seq[Expression]): Seq[CatalogTablePartition] = 
withClient {
+val catalogTable = client.getTable(db, table)
+val partitionColumnNames = catalogTable.partitionColumnNames.toSet
+val nonPartitionPruningPredicates = predicates.filterNot {
+  _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
+}
+
+if (nonPartitionPruningPredicates.nonEmpty) {
+sys.error("Expected only partition pruning predicates: " +
+  predicates.reduceLeft(And))
+}
+
+val partitionSchema = catalogTable.partitionSchema
+
+if (predicates.nonEmpty) {
+  val clientPrunedPartitions =
+client.getPartitionsByFilter(catalogTable, predicates)
+  val boundPredicate =
+InterpretedPredicate.create(predicates.reduce(And).transform {
+  case att: AttributeReference =>
+val index = partitionSchema.indexWhere(_.name == att.name)
--- End diff --

I'll test this and report back.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-12 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r83085625
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types.{StructField, StructType}
+
+
+/**
+ * A [[BasicFileCatalog]] for a metastore catalog table.
+ *
+ * @param sparkSession a [[SparkSession]]
+ * @param db the table's database name
+ * @param table the table's (unqualified) name
+ * @param partitionSchema the schema of a partitioned table's partition 
columns
+ * @param sizeInBytes the table's data size in bytes
+ */
+class TableFileCatalog(
+sparkSession: SparkSession,
+db: String,
+table: String,
+partitionSchema: Option[StructType],
+override val sizeInBytes: Long)
+  extends SessionFileCatalog(sparkSession) {
+
+  override protected val hadoopConf = 
sparkSession.sessionState.newHadoopConf
+
+  private val externalCatalog = sparkSession.sharedState.externalCatalog
+
+  private val catalogTable = externalCatalog.getTable(db, table)
+
+  private val baseLocation = catalogTable.storage.locationUri
+
+  override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq
+
+  override def listFiles(filters: Seq[Expression]): Seq[Partition] = 
partitionSchema match {
+case Some(partitionSchema) =>
+  externalCatalog.listPartitionsByFilter(db, table, filters).flatMap {
+case CatalogTablePartition(spec, storage, _) =>
+  storage.locationUri.map(new Path(_)).map { path =>
+val files = listDataLeafFiles(path :: Nil).toSeq
--- End diff --

I very recently pushed a commit that refactors this to parallelize it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-12 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r83078271
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
 ---
@@ -55,10 +55,16 @@ class TableFileCatalog(
 
   override def listFiles(filters: Seq[Expression]): Seq[Partition] = 
partitionSchema match {
 case Some(partitionSchema) =>
-  externalCatalog.listPartitionsByFilter(db, table, filters).flatMap {
+  val catalogTablePartitions = 
externalCatalog.listPartitionsByFilter(db, table, filters)
--- End diff --

Nevermind, this fails since we try to infer partitioning there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-12 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r83065199
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala ---
@@ -616,6 +617,44 @@ private[spark] class HiveExternalCatalog(conf: 
SparkConf, hadoopConf: Configurat
 client.getPartition(db, table, spec)
   }
 
+  override def listPartitionsByFilter(
+  db: String,
+  table: String,
+  predicates: Seq[Expression]): Seq[CatalogTablePartition] = 
withClient {
+val catalogTable = client.getTable(db, table)
+val partitionColumnNames = catalogTable.partitionColumnNames.toSet
+val nonPartitionPruningPredicates = predicates.filterNot {
+  _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
+}
+
+if (nonPartitionPruningPredicates.nonEmpty) {
+sys.error("Expected only partition pruning predicates: " +
+  predicates.reduceLeft(And))
+}
+
+val partitionSchema = catalogTable.partitionSchema
+
+if (predicates.nonEmpty) {
+  val clientPrunedPartitions =
+client.getPartitionsByFilter(catalogTable, predicates)
+  val boundPredicate =
+InterpretedPredicate.create(predicates.reduce(And).transform {
+  case att: AttributeReference =>
+val index = partitionSchema.indexWhere(_.name == att.name)
--- End diff --

Case sensitive? Or have we already make sure that they are in the same case 
somewhere?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-12 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r83071827
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
 ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+
+private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
+case op @ PhysicalOperation(projects, filters,
+logicalRelation @
+  LogicalRelation(fsRelation @
+HadoopFsRelation(
+  tableFileCatalog: TableFileCatalog,
--- End diff --

Should we put a TODO here (support other catalog)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-12 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r83067223
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types.{StructField, StructType}
+
+
+/**
+ * A [[BasicFileCatalog]] for a metastore catalog table.
+ *
+ * @param sparkSession a [[SparkSession]]
+ * @param db the table's database name
+ * @param table the table's (unqualified) name
+ * @param partitionSchema the schema of a partitioned table's partition 
columns
+ * @param sizeInBytes the table's data size in bytes
+ */
+class TableFileCatalog(
+sparkSession: SparkSession,
+db: String,
+table: String,
+partitionSchema: Option[StructType],
+override val sizeInBytes: Long)
+  extends SessionFileCatalog(sparkSession) {
+
+  override protected val hadoopConf = 
sparkSession.sessionState.newHadoopConf
+
+  private val externalCatalog = sparkSession.sharedState.externalCatalog
+
+  private val catalogTable = externalCatalog.getTable(db, table)
+
+  private val baseLocation = catalogTable.storage.locationUri
+
+  override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq
+
+  override def listFiles(filters: Seq[Expression]): Seq[Partition] = 
partitionSchema match {
+case Some(partitionSchema) =>
+  externalCatalog.listPartitionsByFilter(db, table, filters).flatMap {
+case CatalogTablePartition(spec, storage, _) =>
+  storage.locationUri.map(new Path(_)).map { path =>
+val files = listDataLeafFiles(path :: Nil).toSeq
--- End diff --

Should we do this in parallel? (Could be another PR)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-12 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r83065354
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala ---
@@ -616,6 +617,44 @@ private[spark] class HiveExternalCatalog(conf: 
SparkConf, hadoopConf: Configurat
 client.getPartition(db, table, spec)
   }
 
+  override def listPartitionsByFilter(
+  db: String,
+  table: String,
+  predicates: Seq[Expression]): Seq[CatalogTablePartition] = 
withClient {
+val catalogTable = client.getTable(db, table)
+val partitionColumnNames = catalogTable.partitionColumnNames.toSet
+val nonPartitionPruningPredicates = predicates.filterNot {
+  _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
+}
+
+if (nonPartitionPruningPredicates.nonEmpty) {
+sys.error("Expected only partition pruning predicates: " +
+  predicates.reduceLeft(And))
+}
+
+val partitionSchema = catalogTable.partitionSchema
+
+if (predicates.nonEmpty) {
+  val clientPrunedPartitions =
+client.getPartitionsByFilter(catalogTable, predicates)
+  val boundPredicate =
+InterpretedPredicate.create(predicates.reduce(And).transform {
+  case att: AttributeReference =>
+val index = partitionSchema.indexWhere(_.name == att.name)
+BoundReference(index, partitionSchema(index).dataType, 
nullable = true)
+})
+  clientPrunedPartitions.filter { case CatalogTablePartition(spec, _, 
_) =>
+val row =
+  InternalRow.fromSeq(partitionSchema.map { case StructField(name, 
dataType, _, _) =>
+Cast(Literal(spec(name)), dataType).eval()
+  })
+boundPredicate(row)
--- End diff --

We should have a logging for false-positive from Hive client?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-12 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r83072226
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
 ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+
+private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
+case op @ PhysicalOperation(projects, filters,
+logicalRelation @
+  LogicalRelation(fsRelation @
+HadoopFsRelation(
+  tableFileCatalog: TableFileCatalog,
+  partitionSchema,
+  _,
+  _,
+  _,
+  _),
+_,
+_))
+if filters.nonEmpty && fsRelation.partitionSchemaOption.isDefined 
=>
+  // The attribute name of predicate could be different than the one 
in schema in case of
+  // case insensitive, we should change them to match the one in 
schema, so we donot need to
+  // worry about case sensitivity anymore.
+  val normalizedFilters = filters.map { e =>
+e transform {
+  case a: AttributeReference =>
+
a.withName(logicalRelation.output.find(_.semanticEquals(a)).get.name)
+}
+  }
+
+  val sparkSession = fsRelation.sparkSession
+  val partitionColumns =
+logicalRelation.resolve(
+  partitionSchema, sparkSession.sessionState.analyzer.resolver)
+  val partitionSet = AttributeSet(partitionColumns)
+  val partitionKeyFilters =
+
ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
+
+  if (partitionKeyFilters.nonEmpty) {
+  val prunedFileCatalog = 
tableFileCatalog.filterPartitions(partitionKeyFilters.toSeq)
+  val prunedFsRelation =
+fsRelation.copy(location = prunedFileCatalog)(sparkSession)
+  val prunedLogicalRelation = logicalRelation.copy(relation = 
prunedFsRelation)
+
+  // Keep partition-pruning predicates so that they are visible in 
physical planning
+  val filterExpression = filters.reduceLeft(And)
--- End diff --

The the partitions/files are already filtered out, do we really keep these 
predicates? That also help performance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-12 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r83071819
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
 ---
@@ -55,10 +55,16 @@ class TableFileCatalog(
 
   override def listFiles(filters: Seq[Expression]): Seq[Partition] = 
partitionSchema match {
 case Some(partitionSchema) =>
-  externalCatalog.listPartitionsByFilter(db, table, filters).flatMap {
+  val catalogTablePartitions = 
externalCatalog.listPartitionsByFilter(db, table, filters)
--- End diff --

Would filterPartitions(filters).listFiles(Nil) work here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-12 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r83061877
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -225,13 +225,19 @@ case class FileSourceScanExec(
   }
 
   // These metadata values make scan plans uniquely identifiable for 
equality checking.
-  override val metadata: Map[String, String] = Map(
-"Format" -> relation.fileFormat.toString,
-"ReadSchema" -> outputSchema.catalogString,
-"Batched" -> supportsBatch.toString,
-"PartitionFilters" -> partitionFilters.mkString("[", ", ", "]"),
-"PushedFilters" -> dataFilters.mkString("[", ", ", "]"),
-"InputPaths" -> relation.location.paths.mkString(", "))
+  override val metadata: Map[String, String] = {
+def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]")
+val location = relation.location
+val locationDesc =
+  location.getClass.getSimpleName + seqToString(location.rootPaths)
--- End diff --

Should they be separated by space?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-12 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r83036315
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
 ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+
+private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
+case op @ PhysicalOperation(projects, filters,
+logicalRelation @
+  LogicalRelation(fsRelation @
+HadoopFsRelation(
+  tableFileCatalog: TableFileCatalog,
+  partitionSchema,
+  _,
+  _,
+  _,
+  _),
+_,
+_))
+if filters.nonEmpty && fsRelation.partitionSchemaOption.isDefined 
=>
+  // The attribute name of predicate could be different than the one 
in schema in case of
+  // case insensitive, we should change them to match the one in 
schema, so we donot need to
+  // worry about case sensitivity anymore.
+  val normalizedFilters = filters.map { e =>
--- End diff --

I tried this before. IIRC, the issue is that in a join query the 
`JoinSelection` rule runs before the `FileSourceStrategy` because it matches 
the logical plan first.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-11 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r82902172
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -225,13 +225,16 @@ case class FileSourceScanExec(
   }
 
   // These metadata values make scan plans uniquely identifiable for 
equality checking.
-  override val metadata: Map[String, String] = Map(
-"Format" -> relation.fileFormat.toString,
-"ReadSchema" -> outputSchema.catalogString,
-"Batched" -> supportsBatch.toString,
-"PartitionFilters" -> partitionFilters.mkString("[", ", ", "]"),
-"PushedFilters" -> dataFilters.mkString("[", ", ", "]"),
-"InputPaths" -> relation.location.paths.mkString(", "))
+  override val metadata: Map[String, String] = {
+def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]")
+val location = relation.location
+Map("Format" -> relation.fileFormat.toString,
+  "ReadSchema" -> outputSchema.catalogString,
+  "Batched" -> supportsBatch.toString,
+  "PartitionFilters" -> seqToString(partitionFilters),
+  "PushedFilters" -> seqToString(dataFilters),
+  "Location" -> (location.getClass.getSimpleName + 
seqToString(location.rootPaths)))
--- End diff --

> Btw, you might want to take(10) on the root paths to avoid materializing 
a large string here before it gets truncated.

Actually, since `metadata` is used in various places for query plan 
equivalence checking it seems we need to build the whole string, but with 
sorted paths:

seqToString(location.rootPaths.sorted)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-11 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r82900810
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -225,13 +225,16 @@ case class FileSourceScanExec(
   }
 
   // These metadata values make scan plans uniquely identifiable for 
equality checking.
-  override val metadata: Map[String, String] = Map(
-"Format" -> relation.fileFormat.toString,
-"ReadSchema" -> outputSchema.catalogString,
-"Batched" -> supportsBatch.toString,
-"PartitionFilters" -> partitionFilters.mkString("[", ", ", "]"),
-"PushedFilters" -> dataFilters.mkString("[", ", ", "]"),
-"InputPaths" -> relation.location.paths.mkString(", "))
+  override val metadata: Map[String, String] = {
+def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]")
+val location = relation.location
+Map("Format" -> relation.fileFormat.toString,
+  "ReadSchema" -> outputSchema.catalogString,
+  "Batched" -> supportsBatch.toString,
+  "PartitionFilters" -> seqToString(partitionFilters),
+  "PushedFilters" -> seqToString(dataFilters),
+  "Location" -> (location.getClass.getSimpleName + 
seqToString(location.rootPaths)))
--- End diff --

> Is it different from location.getRootPaths.length? I guess that would 
only be meaningful for Hive-backed tables, but that seems ok.

This is true for relations that have been pruned by 
`PruneFileSourcePartitions`. I'm looking for a tidier solution.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-11 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r82875191
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -225,13 +225,16 @@ case class FileSourceScanExec(
   }
 
   // These metadata values make scan plans uniquely identifiable for 
equality checking.
-  override val metadata: Map[String, String] = Map(
-"Format" -> relation.fileFormat.toString,
-"ReadSchema" -> outputSchema.catalogString,
-"Batched" -> supportsBatch.toString,
-"PartitionFilters" -> partitionFilters.mkString("[", ", ", "]"),
-"PushedFilters" -> dataFilters.mkString("[", ", ", "]"),
-"InputPaths" -> relation.location.paths.mkString(", "))
+  override val metadata: Map[String, String] = {
+def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]")
+val location = relation.location
+Map("Format" -> relation.fileFormat.toString,
+  "ReadSchema" -> outputSchema.catalogString,
+  "Batched" -> supportsBatch.toString,
+  "PartitionFilters" -> seqToString(partitionFilters),
+  "PushedFilters" -> seqToString(dataFilters),
+  "Location" -> (location.getClass.getSimpleName + 
seqToString(location.rootPaths)))
--- End diff --

Is it different from location.getRootPaths.length? I guess that would only 
be meaningful for Hive-backed tables, but that seems ok.

Btw, you might want to take(10) on the root paths to avoid materializing a 
large string here before it gets truncated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-11 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r82850487
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -225,13 +225,16 @@ case class FileSourceScanExec(
   }
 
   // These metadata values make scan plans uniquely identifiable for 
equality checking.
-  override val metadata: Map[String, String] = Map(
-"Format" -> relation.fileFormat.toString,
-"ReadSchema" -> outputSchema.catalogString,
-"Batched" -> supportsBatch.toString,
-"PartitionFilters" -> partitionFilters.mkString("[", ", ", "]"),
-"PushedFilters" -> dataFilters.mkString("[", ", ", "]"),
-"InputPaths" -> relation.location.paths.mkString(", "))
+  override val metadata: Map[String, String] = {
+def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]")
+val location = relation.location
+Map("Format" -> relation.fileFormat.toString,
+  "ReadSchema" -> outputSchema.catalogString,
+  "Batched" -> supportsBatch.toString,
+  "PartitionFilters" -> seqToString(partitionFilters),
+  "PushedFilters" -> seqToString(dataFilters),
+  "Location" -> (location.getClass.getSimpleName + 
seqToString(location.rootPaths)))
--- End diff --

I'm going to pull this value out to a local `val`. The reason I don't want 
to make this part of the `BasicFileCatalog` type is that this format seems 
specific to this class's `metadata` method. For example, we're using the 
method-local `seqToString` method to format the `rootPaths` list.

Regarding the partition count, this is actually a little trickier than I 
expected. I'm going to expand on that in a standalone PR comment after the next 
push.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-10 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r82715947
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -225,13 +225,16 @@ case class FileSourceScanExec(
   }
 
   // These metadata values make scan plans uniquely identifiable for 
equality checking.
-  override val metadata: Map[String, String] = Map(
-"Format" -> relation.fileFormat.toString,
-"ReadSchema" -> outputSchema.catalogString,
-"Batched" -> supportsBatch.toString,
-"PartitionFilters" -> partitionFilters.mkString("[", ", ", "]"),
-"PushedFilters" -> dataFilters.mkString("[", ", ", "]"),
-"InputPaths" -> relation.location.paths.mkString(", "))
+  override val metadata: Map[String, String] = {
+def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]")
+val location = relation.location
+Map("Format" -> relation.fileFormat.toString,
--- End diff --

Not sure, but the indentation is more consistent with a newline after 
`Map(`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-10 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r82715954
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
 ---
@@ -477,6 +478,15 @@ class InMemoryCatalog(
 catalog(db).tables(table).partitions.values.toSeq
   }
 
+  override def listPartitionsByFilter(
+  db: String,
+  table: String,
+  predicates: Seq[Expression]): Seq[CatalogTablePartition] = {
+// TODO: Provide an implementation
+throw new UnsupportedOperationException(
+  "listPartitionsByFilter is not implemented")
--- End diff --

That's true. I guess this is fine for now; we can add back an 
implementation later when needed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-10 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r82713318
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -225,13 +225,16 @@ case class FileSourceScanExec(
   }
 
   // These metadata values make scan plans uniquely identifiable for 
equality checking.
-  override val metadata: Map[String, String] = Map(
-"Format" -> relation.fileFormat.toString,
-"ReadSchema" -> outputSchema.catalogString,
-"Batched" -> supportsBatch.toString,
-"PartitionFilters" -> partitionFilters.mkString("[", ", ", "]"),
-"PushedFilters" -> dataFilters.mkString("[", ", ", "]"),
-"InputPaths" -> relation.location.paths.mkString(", "))
+  override val metadata: Map[String, String] = {
+def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]")
+val location = relation.location
+Map("Format" -> relation.fileFormat.toString,
+  "ReadSchema" -> outputSchema.catalogString,
+  "Batched" -> supportsBatch.toString,
+  "PartitionFilters" -> seqToString(partitionFilters),
+  "PushedFilters" -> seqToString(dataFilters),
+  "Location" -> (location.getClass.getSimpleName + 
seqToString(location.rootPaths)))
--- End diff --

I considered that. I agree this doesn't look very good. I'll turn this over 
in my head and push something better.

Also, I'll add the partition count. Good idea there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-10 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r82713068
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -225,13 +225,16 @@ case class FileSourceScanExec(
   }
 
   // These metadata values make scan plans uniquely identifiable for 
equality checking.
-  override val metadata: Map[String, String] = Map(
-"Format" -> relation.fileFormat.toString,
-"ReadSchema" -> outputSchema.catalogString,
-"Batched" -> supportsBatch.toString,
-"PartitionFilters" -> partitionFilters.mkString("[", ", ", "]"),
-"PushedFilters" -> dataFilters.mkString("[", ", ", "]"),
-"InputPaths" -> relation.location.paths.mkString(", "))
+  override val metadata: Map[String, String] = {
+def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]")
+val location = relation.location
+Map("Format" -> relation.fileFormat.toString,
--- End diff --

I don't know. Does Spark have a guideline for this? (Also, I'm not entirely 
sure I know what you're referring to here.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-10 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r82712965
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
 ---
@@ -477,6 +478,15 @@ class InMemoryCatalog(
 catalog(db).tables(table).partitions.values.toSeq
   }
 
+  override def listPartitionsByFilter(
+  db: String,
+  table: String,
+  predicates: Seq[Expression]): Seq[CatalogTablePartition] = {
+// TODO: Provide an implementation
+throw new UnsupportedOperationException(
+  "listPartitionsByFilter is not implemented")
--- End diff --

It isn't? In the spirit of the principle of least surprise, my intention 
for this method was that it return exactly the partitions matching the given 
predicates.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-10 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r82708580
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
 ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+
+private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
+case op @ PhysicalOperation(projects, filters,
+logicalRelation @
+  LogicalRelation(fsRelation @
+HadoopFsRelation(
+  tableFileCatalog: TableFileCatalog,
+  partitionSchema,
+  _,
+  _,
+  _,
+  _),
+_,
+_))
+if filters.nonEmpty && fsRelation.partitionSchemaOption.isDefined 
=>
+  // The attribute name of predicate could be different than the one 
in schema in case of
+  // case insensitive, we should change them to match the one in 
schema, so we donot need to
+  // worry about case sensitivity anymore.
+  val normalizedFilters = filters.map { e =>
--- End diff --

Could we de-duplicate this code from that in FileSourceStrategy?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-10 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r82708474
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalog.scala
 ---
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.FileNotFoundException
+
+import scala.collection.mutable
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path}
+import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+
+
+/**
+ * A base class for [[BasicFileCatalog]]s that need a [[SparkSession]] and 
the ability to find leaf
+ * files in a list of HDFS paths.
+ *
+ * @param sparkSession a [[SparkSession]]
+ * @param ignoreFileNotFound (see [[ListingFileCatalog]])
+ */
+abstract class SessionFileCatalog(sparkSession: SparkSession)
--- End diff --

Note to self: moved verbatim


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-10 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r82709034
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala ---
@@ -619,6 +620,44 @@ private[spark] class HiveExternalCatalog(conf: 
SparkConf, hadoopConf: Configurat
 client.getPartition(db, table, spec)
   }
 
+  override def listPartitionsByFilter(
+  db: String,
+  table: String,
+  predicates: Seq[Expression]): Seq[CatalogTablePartition] = 
withClient {
+val catalogTable = client.getTable(db, table)
+val partitionColumnNames = catalogTable.partitionColumnNames.toSet
+val nonPartitionPruningPredicates = predicates.filterNot {
+  _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
+}
+
+if (nonPartitionPruningPredicates.nonEmpty) {
+sys.error("Expected only partition pruning predicates: " +
+  predicates.reduceLeft(And))
+}
+
+val partitionSchema = catalogTable.partitionSchema
+
+if (predicates.nonEmpty) {
+  val clientPrunedPartitions =
+client.getPartitionsByFilter(catalogTable, predicates)
+  val boundPredicate =
+InterpretedPredicate.create(predicates.reduce(And).transform {
+  case att: AttributeReference =>
+val index = partitionSchema.indexWhere(_.name == att.name)
+BoundReference(index, partitionSchema(index).dataType, 
nullable = true)
+})
+  clientPrunedPartitions.filter { case CatalogTablePartition(spec, _, 
_) =>
+val row =
+  InternalRow.fromSeq(partitionSchema.map { case StructField(name, 
dataType, _, _) =>
+Cast(Literal(spec(name)), dataType).eval()
+  })
+boundPredicate(row)
+  }
+} else {
--- End diff --

Any way to de-dup the pruning logic from PartitioningAwareFileCatalog?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-10 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r82707347
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
 ---
@@ -477,6 +478,15 @@ class InMemoryCatalog(
 catalog(db).tables(table).partitions.values.toSeq
   }
 
+  override def listPartitionsByFilter(
+  db: String,
+  table: String,
+  predicates: Seq[Expression]): Seq[CatalogTablePartition] = {
+// TODO: Provide an implementation
+throw new UnsupportedOperationException(
+  "listPartitionsByFilter is not implemented")
--- End diff --

I think you can just return all the partitions, since it's not required for 
correctness.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-10 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r82707709
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -225,13 +225,16 @@ case class FileSourceScanExec(
   }
 
   // These metadata values make scan plans uniquely identifiable for 
equality checking.
-  override val metadata: Map[String, String] = Map(
-"Format" -> relation.fileFormat.toString,
-"ReadSchema" -> outputSchema.catalogString,
-"Batched" -> supportsBatch.toString,
-"PartitionFilters" -> partitionFilters.mkString("[", ", ", "]"),
-"PushedFilters" -> dataFilters.mkString("[", ", ", "]"),
-"InputPaths" -> relation.location.paths.mkString(", "))
+  override val metadata: Map[String, String] = {
+def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]")
+val location = relation.location
+Map("Format" -> relation.fileFormat.toString,
+  "ReadSchema" -> outputSchema.catalogString,
+  "Batched" -> supportsBatch.toString,
+  "PartitionFilters" -> seqToString(partitionFilters),
+  "PushedFilters" -> seqToString(dataFilters),
+  "Location" -> (location.getClass.getSimpleName + 
seqToString(location.rootPaths)))
--- End diff --

Should this be a method of location? Btw, including the number of 
partitions would be nice too, since it gets truncated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-10-10 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r82707684
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -225,13 +225,16 @@ case class FileSourceScanExec(
   }
 
   // These metadata values make scan plans uniquely identifiable for 
equality checking.
-  override val metadata: Map[String, String] = Map(
-"Format" -> relation.fileFormat.toString,
-"ReadSchema" -> outputSchema.catalogString,
-"Batched" -> supportsBatch.toString,
-"PartitionFilters" -> partitionFilters.mkString("[", ", ", "]"),
-"PushedFilters" -> dataFilters.mkString("[", ", ", "]"),
-"InputPaths" -> relation.location.paths.mkString(", "))
+  override val metadata: Map[String, String] = {
+def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]")
+val location = relation.location
+Map("Format" -> relation.fileFormat.toString,
--- End diff --

nit: should be on next line as well?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-09-02 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r77428539
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
 ---
@@ -346,11 +340,30 @@ trait FileCatalog {
*/
   def listFiles(filters: Seq[Expression]): Seq[Partition]
 
+  /** Refresh any cached file listings */
+  def refresh(): Unit
+
+  /** Sum of table file sizes, in bytes */
+  def sizeInBytes: Long
+}
+
+/**
+ * A [[BasicFileCatalog]] which can enumerate all of the files comprising 
a relation and, from
+ * those, infer the relation's partition specification.
+ */
+trait FileCatalog extends BasicFileCatalog {
--- End diff --

Makes sense. I feel like the naming here is a little weird but maybe it's 
best to clean this up in a separate PR to avoid too many changes here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-09-02 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r77428351
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types.{StructField, StructType}
+
+
+/**
+ * A [[BasicFileCatalog]] for a metastore catalog table.
+ *
+ * @param sparkSession a [[SparkSession]]
+ * @param db the table's database name
+ * @param table the table's (unqualified) name
+ * @param partitionSchema the schema of a partitioned table's partition 
columns
+ * @param sizeInBytes the table's data size in bytes
+ */
+class TableFileCatalog(
+sparkSession: SparkSession,
+db: String,
+table: String,
+partitionSchema: Option[StructType],
+override val sizeInBytes: Long)
+  extends SessionFileCatalog(sparkSession) {
+
+  override protected val hadoopConf = 
sparkSession.sessionState.newHadoopConf
+
+  private val externalCatalog = sparkSession.sharedState.externalCatalog
+
+  private val catalogTable = externalCatalog.getTable(db, table)
+
+  private val baseLocation = catalogTable.storage.locationUri
+
+  override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq
+
+  override def listFiles(filters: Seq[Expression]): Seq[Partition] = 
partitionSchema match {
+case Some(partitionSchema) =>
+  externalCatalog.listPartitionsByFilter(db, table, filters).flatMap {
+case CatalogTablePartition(spec, storage, _) =>
+  storage.locationUri.map(new Path(_)).map { path =>
+val files = listDataLeafFiles(path :: Nil).toSeq
+val values =
+  InternalRow.fromSeq(partitionSchema.map { case 
StructField(name, dataType, _, _) =>
+Cast(Literal(spec(name)), dataType).eval()
+  })
+Partition(values, files)
+  }
+  }
+case None =>
+  Partition(InternalRow.empty, listDataLeafFiles(rootPaths).toSeq) :: 
Nil
+  }
+
+  override def refresh(): Unit = {}
+
+
+  /**
+   * Returns a [[ListingFileCatalog]] for this table restricted to the 
subset of partitions
+   * specified by the given partition-pruning filters.
+   *
+   * @param filters partition-pruning filters
+   */
+  def filterPartitions(filters: Seq[Expression]): ListingFileCatalog = {
--- End diff --

> To be more specific, this is particularly helpful in validating correct 
pruning behavior in left and right outer joins. Sure, an expert will understand 
which sides' partition pruning filters need to go in the "join" clause and 
which in the "where" clause. This is not the case with less experienced users 
and even experienced users can forget which goes where and when.

Btw I think you could look at pushed partition filters in the physical scan 
node to determine if pruning is happening as expected.

> Yes, have a broadcast hint. The problem we've had with that is that we 
haven't found a way to apply that in SQL queries. (Is there a syntax in the 
parser for that?)

There's a pr out for this (not yet merged) 
https://github.com/apache/spark/pull/14426


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-

[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-09-02 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r77426981
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -2531,6 +2531,8 @@ class Dataset[T] private[sql](
*/
   def inputFiles: Array[String] = {
 val files: Seq[String] = logicalPlan.collect {
+  case LogicalRelation(HadoopFsRelation(_, location: FileCatalog, _, 
_, _, _, _), _, _) =>
--- End diff --

Good question. It seems a call to `Dataset.inputFiles` which reads from a 
partitioned table will return an empty array here. On the other hand, asking 
for the input files for a large partitioned table will require a costly tree 
traversal.

I'll have to think about this some more before I have an opinion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-09-02 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r77426759
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
 ---
@@ -346,11 +340,30 @@ trait FileCatalog {
*/
   def listFiles(filters: Seq[Expression]): Seq[Partition]
 
+  /** Refresh any cached file listings */
+  def refresh(): Unit
+
+  /** Sum of table file sizes, in bytes */
+  def sizeInBytes: Long
+}
+
+/**
+ * A [[BasicFileCatalog]] which can enumerate all of the files comprising 
a relation and, from
+ * those, infer the relation's partition specification.
+ */
+trait FileCatalog extends BasicFileCatalog {
--- End diff --

Yes, I want to avoid/discourage file traversals of partitions which aren't 
part of a user's query. Moving `allFiles()` out of the root of the file catalog 
hierarchy let's us omit it from `TableFileCatalog`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-09-02 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r77426633
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types.{StructField, StructType}
+
+
+/**
+ * A [[BasicFileCatalog]] for a metastore catalog table.
+ *
+ * @param sparkSession a [[SparkSession]]
+ * @param db the table's database name
+ * @param table the table's (unqualified) name
+ * @param partitionSchema the schema of a partitioned table's partition 
columns
+ * @param sizeInBytes the table's data size in bytes
+ */
+class TableFileCatalog(
+sparkSession: SparkSession,
+db: String,
+table: String,
+partitionSchema: Option[StructType],
+override val sizeInBytes: Long)
+  extends SessionFileCatalog(sparkSession) {
+
+  override protected val hadoopConf = 
sparkSession.sessionState.newHadoopConf
+
+  private val externalCatalog = sparkSession.sharedState.externalCatalog
+
+  private val catalogTable = externalCatalog.getTable(db, table)
+
+  private val baseLocation = catalogTable.storage.locationUri
+
+  override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq
+
+  override def listFiles(filters: Seq[Expression]): Seq[Partition] = 
partitionSchema match {
+case Some(partitionSchema) =>
+  externalCatalog.listPartitionsByFilter(db, table, filters).flatMap {
+case CatalogTablePartition(spec, storage, _) =>
+  storage.locationUri.map(new Path(_)).map { path =>
+val files = listDataLeafFiles(path :: Nil).toSeq
+val values =
+  InternalRow.fromSeq(partitionSchema.map { case 
StructField(name, dataType, _, _) =>
+Cast(Literal(spec(name)), dataType).eval()
+  })
+Partition(values, files)
+  }
+  }
+case None =>
+  Partition(InternalRow.empty, listDataLeafFiles(rootPaths).toSeq) :: 
Nil
+  }
+
+  override def refresh(): Unit = {}
+
+
+  /**
+   * Returns a [[ListingFileCatalog]] for this table restricted to the 
subset of partitions
+   * specified by the given partition-pruning filters.
+   *
+   * @param filters partition-pruning filters
+   */
+  def filterPartitions(filters: Seq[Expression]): ListingFileCatalog = {
--- End diff --

I see your point about avoiding the overhead of pruning in planning. The 
flip side of that is surfacing information valuable to both the planner and the 
user in the query plan itself. This is why our (VideoAmp's) clone of Spark does 
partition pruning in the optimization phase (as it was in the early days of 
Spark SQL, I think Spark pre-1.3).

We've found that seeing the actual partitions that will be read in the 
query plan (in "explain extended", for example) has been invaluable in 
validating that a query is going to read the files we think it's going to read 
instead of the entire freakin' table.

To be more specific, this is particularly helpful in validating correct 
pruning behavior in left and right outer joins. Sure, an expert will understand 
which sides' partition pruning filters need to go in the "join" clause and 
which in the "where" clause. This is not the case with less experienced users 
and even experienced users can forget which goes where and when.

In practice, performing partition pruning in query planning has not 
presented a problem for us. The operation is either performed in planning or in 
execution. Pe

[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-09-02 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r77426636
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types.{StructField, StructType}
+
+
+/**
+ * A [[BasicFileCatalog]] for a metastore catalog table.
+ *
+ * @param sparkSession a [[SparkSession]]
+ * @param db the table's database name
+ * @param table the table's (unqualified) name
+ * @param partitionSchema the schema of a partitioned table's partition 
columns
+ * @param sizeInBytes the table's data size in bytes
+ */
+class TableFileCatalog(
+sparkSession: SparkSession,
+db: String,
+table: String,
+partitionSchema: Option[StructType],
+override val sizeInBytes: Long)
+  extends SessionFileCatalog(sparkSession) {
+
+  override protected val hadoopConf = 
sparkSession.sessionState.newHadoopConf
+
+  private val externalCatalog = sparkSession.sharedState.externalCatalog
--- End diff --

Maybe it's ok for now, but I think it makes the dependencies more clear, 
otherwise it's not obvious this class basically delegates most of its work to 
`ExternalCatalog`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-09-02 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r77426535
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 ---
@@ -79,8 +79,16 @@ object FileSourceStrategy extends Strategy with Logging {
 
ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
   logInfo(s"Pruning directories with: 
${partitionKeyFilters.mkString(",")}")
 
+  val prunedFsRelation = fsRelation.location match {
--- End diff --

That's correct. Let's wait for #14750 then. My feeling here is that schema 
inference over a subset of files is not acceptable since then the schema can 
vary with the partition predicate.

For the broadcast join optimization, I think that is a larger issue that 
should not affect the design of this pr.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-09-02 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r77425878
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types.{StructField, StructType}
+
+
+/**
+ * A [[BasicFileCatalog]] for a metastore catalog table.
+ *
+ * @param sparkSession a [[SparkSession]]
+ * @param db the table's database name
+ * @param table the table's (unqualified) name
+ * @param partitionSchema the schema of a partitioned table's partition 
columns
+ * @param sizeInBytes the table's data size in bytes
+ */
+class TableFileCatalog(
+sparkSession: SparkSession,
+db: String,
+table: String,
+partitionSchema: Option[StructType],
+override val sizeInBytes: Long)
+  extends SessionFileCatalog(sparkSession) {
+
+  override protected val hadoopConf = 
sparkSession.sessionState.newHadoopConf
+
+  private val externalCatalog = sparkSession.sharedState.externalCatalog
--- End diff --

Yes, but why would we want to pass something other than

sparkSession.sharedState.externalCatalog

?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-09-02 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r77425776
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 ---
@@ -79,8 +79,16 @@ object FileSourceStrategy extends Strategy with Logging {
 
ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
   logInfo(s"Pruning directories with: 
${partitionKeyFilters.mkString(",")}")
 
+  val prunedFsRelation = fsRelation.location match {
--- End diff --

It appears `FileSourceScanExec` still does that. It's just redundant in 
this case.

If I understand correctly, I think that what you're asking was how things 
worked in my first commit. In my second commit, I put this code (and the 
`TableFileCatalog.filterPartitions` method) into the planning stage for two 
reasons.

First, if we need to do a file schema discovery and reconciliation against 
the metastore schema, we don't want to do that over the entire table. In my 
first commit, I avoided this operation by removing it completely. However, as a 
consequence it broke compatibility with parquet files with mixed case column 
names. As a fix, I planned to defer this schema reconciliation to physical 
planning, after the partition pruning. In the analysis phase, the analyzer 
would still use the metastore schema. This would be a compromise solution 
wherein only the pruned partitions' file schema would be scanned. However, I 
believe the work to retrieve and use the schema stored in the hive table 
properties will make this file scanning entirely unnecessary. That remains to 
be seen. I'm waiting for #14750 to be merged into master first.

Second, I've long been frustrated by the inability of the planner to use 
the data size of a partition-pruned relation in deciding whether to apply an 
automatic broadcast join. I just realized that this commit won't help here, 
since that decision is made with the logical plan before it gets here. I could 
explore pruning in the optimizer instead. In fact, that's how we've done it on 
an internal spark clone. It helps seeing the pruning done in the optimized plan.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-09-02 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r77424735
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -184,7 +184,7 @@ case class FileSourceScanExec(
 "Batched" -> supportsBatch.toString,
 "PartitionFilters" -> partitionFilters.mkString("[", ", ", "]"),
 "PushedFilters" -> dataFilters.mkString("[", ", ", "]"),
-"InputPaths" -> relation.location.paths.mkString(", "))
+"RootPaths" -> relation.location.rootPaths.mkString(", "))
--- End diff --

Yeah, I think mainly you want to be able to differentiate between a 
metastore-backed and pure file-catalog relation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-09-02 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r77423958
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -184,7 +184,7 @@ case class FileSourceScanExec(
 "Batched" -> supportsBatch.toString,
 "PartitionFilters" -> partitionFilters.mkString("[", ", ", "]"),
 "PushedFilters" -> dataFilters.mkString("[", ", ", "]"),
-"InputPaths" -> relation.location.paths.mkString(", "))
+"RootPaths" -> relation.location.rootPaths.mkString(", "))
--- End diff --

I'm not quite sure what you're asking for here. If I replace `"RootPaths"` 
with an entry

"Location" -> relation.location

and implement appropriate `toString` methods for the `relation.location` 
classes, does that satisfy your concern?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-09-02 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r77422471
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types.{StructField, StructType}
+
+
+/**
+ * A [[BasicFileCatalog]] for a metastore catalog table.
+ *
+ * @param sparkSession a [[SparkSession]]
+ * @param db the table's database name
+ * @param table the table's (unqualified) name
+ * @param partitionSchema the schema of a partitioned table's partition 
columns
+ * @param sizeInBytes the table's data size in bytes
+ */
+class TableFileCatalog(
+sparkSession: SparkSession,
+db: String,
+table: String,
+partitionSchema: Option[StructType],
+override val sizeInBytes: Long)
+  extends SessionFileCatalog(sparkSession) {
+
+  override protected val hadoopConf = 
sparkSession.sessionState.newHadoopConf
+
+  private val externalCatalog = sparkSession.sharedState.externalCatalog
+
+  private val catalogTable = externalCatalog.getTable(db, table)
+
+  private val baseLocation = catalogTable.storage.locationUri
+
+  override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq
+
+  override def listFiles(filters: Seq[Expression]): Seq[Partition] = 
partitionSchema match {
+case Some(partitionSchema) =>
+  externalCatalog.listPartitionsByFilter(db, table, filters).flatMap {
+case CatalogTablePartition(spec, storage, _) =>
+  storage.locationUri.map(new Path(_)).map { path =>
+val files = listDataLeafFiles(path :: Nil).toSeq
+val values =
+  InternalRow.fromSeq(partitionSchema.map { case 
StructField(name, dataType, _, _) =>
+Cast(Literal(spec(name)), dataType).eval()
+  })
+Partition(values, files)
+  }
+  }
+case None =>
+  Partition(InternalRow.empty, listDataLeafFiles(rootPaths).toSeq) :: 
Nil
+  }
+
+  override def refresh(): Unit = {}
+
+
+  /**
+   * Returns a [[ListingFileCatalog]] for this table restricted to the 
subset of partitions
+   * specified by the given partition-pruning filters.
+   *
+   * @param filters partition-pruning filters
+   */
+  def filterPartitions(filters: Seq[Expression]): ListingFileCatalog = {
--- End diff --

Usually the partitioned table is big (fact table), mostly broadcast join 
will not be picked even having the pruned statistics. btw, we have broadcast 
hint, it's fine to move the pruning into execution time, same as others.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-09-02 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r77419498
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types.{StructField, StructType}
+
+
+/**
+ * A [[BasicFileCatalog]] for a metastore catalog table.
+ *
+ * @param sparkSession a [[SparkSession]]
+ * @param db the table's database name
+ * @param table the table's (unqualified) name
+ * @param partitionSchema the schema of a partitioned table's partition 
columns
+ * @param sizeInBytes the table's data size in bytes
+ */
+class TableFileCatalog(
+sparkSession: SparkSession,
+db: String,
+table: String,
+partitionSchema: Option[StructType],
+override val sizeInBytes: Long)
+  extends SessionFileCatalog(sparkSession) {
+
+  override protected val hadoopConf = 
sparkSession.sessionState.newHadoopConf
+
+  private val externalCatalog = sparkSession.sharedState.externalCatalog
+
+  private val catalogTable = externalCatalog.getTable(db, table)
+
+  private val baseLocation = catalogTable.storage.locationUri
+
+  override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq
+
+  override def listFiles(filters: Seq[Expression]): Seq[Partition] = 
partitionSchema match {
+case Some(partitionSchema) =>
+  externalCatalog.listPartitionsByFilter(db, table, filters).flatMap {
+case CatalogTablePartition(spec, storage, _) =>
+  storage.locationUri.map(new Path(_)).map { path =>
+val files = listDataLeafFiles(path :: Nil).toSeq
+val values =
+  InternalRow.fromSeq(partitionSchema.map { case 
StructField(name, dataType, _, _) =>
+Cast(Literal(spec(name)), dataType).eval()
+  })
+Partition(values, files)
+  }
+  }
+case None =>
+  Partition(InternalRow.empty, listDataLeafFiles(rootPaths).toSeq) :: 
Nil
+  }
+
+  override def refresh(): Unit = {}
+
+
+  /**
+   * Returns a [[ListingFileCatalog]] for this table restricted to the 
subset of partitions
+   * specified by the given partition-pruning filters.
+   *
+   * @param filters partition-pruning filters
+   */
+  def filterPartitions(filters: Seq[Expression]): ListingFileCatalog = {
--- End diff --

It seems a little weird to have catalogs that refer to a pruned table. We 
should try to do this at execution time instead, so that planning does not 
block behind pruning.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-09-02 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r77419464
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 ---
@@ -79,8 +79,16 @@ object FileSourceStrategy extends Strategy with Logging {
 
ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
   logInfo(s"Pruning directories with: 
${partitionKeyFilters.mkString(",")}")
 
+  val prunedFsRelation = fsRelation.location match {
--- End diff --

Can we push this pruning into the scan (i.e. do it when computing 
`inputRDD` in `FileSourceScanExec`)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-09-02 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r77419510
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
 ---
@@ -346,11 +340,30 @@ trait FileCatalog {
*/
   def listFiles(filters: Seq[Expression]): Seq[Partition]
 
+  /** Refresh any cached file listings */
+  def refresh(): Unit
+
+  /** Sum of table file sizes, in bytes */
+  def sizeInBytes: Long
+}
+
+/**
+ * A [[BasicFileCatalog]] which can enumerate all of the files comprising 
a relation and, from
+ * those, infer the relation's partition specification.
+ */
+trait FileCatalog extends BasicFileCatalog {
--- End diff --

What's the motivation behind splitting FileCatalog and BasicFileCatalog? Is 
it to prevent accidental calls to allFiles()?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-09-02 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r77419448
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -184,7 +184,7 @@ case class FileSourceScanExec(
 "Batched" -> supportsBatch.toString,
 "PartitionFilters" -> partitionFilters.mkString("[", ", ", "]"),
 "PushedFilters" -> dataFilters.mkString("[", ", ", "]"),
-"InputPaths" -> relation.location.paths.mkString(", "))
+"RootPaths" -> relation.location.rootPaths.mkString(", "))
--- End diff --

Btw, it would be nice to make sure the physical plan still has a good debug 
string when you call explain (i.e. tells which catalog it's using) since that 
will greatly impact performance in this case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-09-02 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r77419496
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types.{StructField, StructType}
+
+
+/**
+ * A [[BasicFileCatalog]] for a metastore catalog table.
+ *
+ * @param sparkSession a [[SparkSession]]
+ * @param db the table's database name
+ * @param table the table's (unqualified) name
+ * @param partitionSchema the schema of a partitioned table's partition 
columns
+ * @param sizeInBytes the table's data size in bytes
+ */
+class TableFileCatalog(
+sparkSession: SparkSession,
+db: String,
+table: String,
+partitionSchema: Option[StructType],
+override val sizeInBytes: Long)
+  extends SessionFileCatalog(sparkSession) {
+
+  override protected val hadoopConf = 
sparkSession.sessionState.newHadoopConf
+
+  private val externalCatalog = sparkSession.sharedState.externalCatalog
--- End diff --

Can we make this an explicit constructor parameter?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-09-02 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/14690#discussion_r77419455
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -2531,6 +2531,8 @@ class Dataset[T] private[sql](
*/
   def inputFiles: Array[String] = {
 val files: Seq[String] = logicalPlan.collect {
+  case LogicalRelation(HadoopFsRelation(_, location: FileCatalog, _, 
_, _, _, _), _, _) =>
--- End diff --

Hm, should we still have HadoopFsRelation implement FileRelation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...

2016-08-17 Thread mallman
GitHub user mallman opened a pull request:

https://github.com/apache/spark/pull/14690

[SPARK-16980][SQL] Load only catalog table partition metadata required to 
answer a query

(This PR addresses https://issues.apache.org/jira/browse/SPARK-16980.)

(N.B. I'm submitting this PR as an enhanced version of an internal POC I 
wrote. I'm looking for preliminary feedback on what I have so far and to 
discuss some design and implementation issues. This PR is not currently a 
candidate for merging into master.)

(N.B. This PR is known to fail several unit tests related to Hive/Parquet 
conversion. Obviously, these failures will be addressed before this PR is 
submitted for merging into master.)

## What changes were proposed in this pull request?

In a new Spark application, when a partitioned Hive table is converted to 
use Spark's `HadoopFsRelation` in `HiveMetastoreCatalog`, metadata for every 
partition of that table are retrieved from the metastore and loaded into driver 
memory. In addition, every partition's metadata files are read from the 
filesystem to perform schema inference.

If a user queries such a table with predicates which prune that table's 
partitions, we would like to be able to answer that query without consulting 
partition metadata which are not involved in the query. When querying a table 
with a large number of partitions for some data from a small number of 
partitions (maybe even a single partition), the current conversion strategy is 
highly inefficient. I suspect this scenario is not uncommon in the wild.

In addition to being inefficient in running time, the current strategy is 
inefficient in its use of driver memory. When the sum of the number of 
partitions of all tables loaded in a driver reaches a certain level (somewhere 
in the tens of thousands), their cached data exhaust all driver heap memory in 
the default configuration. I suspect this scenario is less common (in that not 
too many deployments work with tables with tens of thousands of partitions), 
however this does illustrate how large the memory footprint of this metadata 
can be. With tables with hundreds or thousands of partitions, I would expect 
the `HiveMetastoreCatalog` table cache to represent a significant portion of 
the driver's heap space.

This PR proposes an alternative approach. Basically, it makes three changes:

1. It adds a new method, `listPartitionsByFilter` to the Catalyst 
`ExternalCatalog` trait which returns the partition metadata for a given 
sequence of partition pruning predicates.
1. It refactors the `FileCatalog` type hierarchy to include a new 
`TableFileCatalog` to efficiently return files only for partitions matching a 
sequence of partition pruning predicates.
1. It removes partition loading and caching from `HiveMetastoreCatalog`.

The net effect is that when a query over a partitioned Hive table is 
planned, the analyzer retrieves the table metadata from `HiveMetastoreCatalog`. 
As part of this operation, the `HiveMetastoreCatalog` builds a 
`HadoopFsRelation` with a `TableFileCatalog`. It does not load any partition 
metadata or scan any files. The physical planner identifies the data files the 
query needs by asking the relation's `TableFileCatalog` for the files matching 
any predicate pruning predicates. The `TableFileCatalog` in turn calls the 
`listPartitionsByFilter` method on its external catalog. This queries the Hive 
metastore, passing along those filters.

## Open Issues

1. This PR omits partition metadata caching. I'm not sure if this is even 
needed if we're only loading partition metadata for a given query. However, it 
may not be that tricky to implement this effectively.
1. This PR removes and omits partitioned Hive table schema reconciliation. 
As a result, it fails to find Parquet schema columns with upper case letters 
because of the Hive metastore's case-insensitivity. I think this is the most 
significant hurdle for this PR. It just occurred to me that we might be able to 
do just-in-time schema reconciliation using the partitions that are used in a 
query. I haven't tried this, but I would attempt this by adding a method to 
`HadoopFsRelation` or `BasicFileCatalog` which returns a SQL schema for a given 
sequence of partition pruning predicates (or partitions). I'll give this a try 
and report back. Another idea would be to use the current strategy of merging 
schema from all table files unless the user sets a boolean SQL configuration 
parameter like `spark.sql.assumeLowerCaseColumnNames`. If the user's tables 
have only lower-case column names, then it's safe to use this PR's 
optimizations. I don't think this is an entirely unrealistic scenario as w
 e have enforced all lower-case column names from the beginning because of 
case-sensitivity issues. Maybe we're not the only ones?
1. This PR omits an implementation of `listPartitionsByFilter` for