[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/14690 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r83518291 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -626,6 +627,40 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat client.getPartition(db, table, spec) } + override def listPartitionsByFilter( --- End diff -- > Why can't we just call client.getPartitionsByFilter in this method and put all other logic at caller side? There are two reasons: 1. This method is intended to return exactly the partitions matching the given filter. 2. This method is called from multiple places. I feel this is an acceptable place to centralize this common logic. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r83517834 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala --- @@ -17,32 +17,26 @@ package org.apache.spark.sql.execution.datasources -import java.io.FileNotFoundException - import scala.collection.mutable -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ -import org.apache.hadoop.mapred.{FileInputFormat, JobConf} -import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.StructType -import org.apache.spark.util.SerializableConfiguration /** * A [[FileCatalog]] that generates the list of files to process by recursively listing all the * files present in `paths`. * + * @param rootPaths the list of root table paths to scan * @param parameters as set of options to control discovery - * @param paths a list of paths to scan * @param partitionSchema an optional partition schema that will be use to provide types for the *discovered partitions */ class ListingFileCatalog( sparkSession: SparkSession, -override val paths: Seq[Path], --- End diff -- In earlier versions of this PR, these paths were renamed to `rootPaths` to indicate that they were only the filesystem "roots" for this tableâno partition directories. This restriction was relaxed in a later iteration of the PR to address a shortcoming of the new architecture. The name `paths` probably makes more sense now, but I'm reluctant to push any non-critical commits right now. The last rebase was a killer. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r83356659 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -626,6 +627,40 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat client.getPartition(db, table, spec) } + override def listPartitionsByFilter( --- End diff -- Well, `filtering the partition values again` seems a hive specific logic, as hive metastore may not support partition pruning and still return all partitions. LGTM then --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r83355030 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala --- @@ -34,7 +34,7 @@ import org.apache.spark.util.Utils // The data where the partitioning key exists only in the directory structure. case class ParquetData(intField: Int, stringField: String) // The data that also includes the partitioning key -case class ParquetDataWithKey(p: Int, intField: Int, stringField: String) +case class ParquetDataWithKey(pQ: Int, intField: Int, stringField: String) --- End diff -- The modified unit tests did surface a bug, but it's tricky in how it manifested itself. Very briefly, when adding missing partitions for partition columns with an upper-case letter `msck repair table` works, but `alter table ... add partition` does not. The latter actually creates new directories. For example, here's a partitioned table listing before calling `add partition`: ``` -rw-r--r-- 3 msa msa 0 2016-10-14 03:52 test_mixed_case/_SUCCESS drwxr-xr-x - msa msa 0 2016-10-14 03:52 test_mixed_case/partCol=0 drwxr-xr-x - msa msa 0 2016-10-14 03:52 test_mixed_case/partCol=1 drwxr-xr-x - msa msa 0 2016-10-14 03:52 test_mixed_case/partCol=2 drwxr-xr-x - msa msa 0 2016-10-14 03:52 test_mixed_case/partCol=3 drwxr-xr-x - msa msa 0 2016-10-14 03:52 test_mixed_case/partCol=4 drwxr-xr-x - msa msa 0 2016-10-14 03:52 test_mixed_case/partCol=5 drwxr-xr-x - msa msa 0 2016-10-14 03:52 test_mixed_case/partCol=6 drwxr-xr-x - msa msa 0 2016-10-14 03:52 test_mixed_case/partCol=7 drwxr-xr-x - msa msa 0 2016-10-14 03:52 test_mixed_case/partCol=8 drwxr-xr-x - msa msa 0 2016-10-14 03:52 test_mixed_case/partCol=9 ``` Here's the listing after calling `add partition` for each partition in turn: ``` -rw-r--r-- 3 msa msa 0 2016-10-14 03:52 test_mixed_case/_SUCCESS drwxr-xr-x - msa msa 0 2016-10-14 03:52 test_mixed_case/partCol=0 drwxr-xr-x - msa msa 0 2016-10-14 03:52 test_mixed_case/partCol=1 drwxr-xr-x - msa msa 0 2016-10-14 03:52 test_mixed_case/partCol=2 drwxr-xr-x - msa msa 0 2016-10-14 03:52 test_mixed_case/partCol=3 drwxr-xr-x - msa msa 0 2016-10-14 03:52 test_mixed_case/partCol=4 drwxr-xr-x - msa msa 0 2016-10-14 03:52 test_mixed_case/partCol=5 drwxr-xr-x - msa msa 0 2016-10-14 03:52 test_mixed_case/partCol=6 drwxr-xr-x - msa msa 0 2016-10-14 03:52 test_mixed_case/partCol=7 drwxr-xr-x - msa msa 0 2016-10-14 03:52 test_mixed_case/partCol=8 drwxr-xr-x - msa msa 0 2016-10-14 03:52 test_mixed_case/partCol=9 drwxr-xr-x - msa msa 0 2016-10-14 03:53 test_mixed_case/partcol=0 drwxr-xr-x - msa msa 0 2016-10-14 03:53 test_mixed_case/partcol=1 drwxr-xr-x - msa msa 0 2016-10-14 03:53 test_mixed_case/partcol=2 drwxr-xr-x - msa msa 0 2016-10-14 03:53 test_mixed_case/partcol=3 drwxr-xr-x - msa msa 0 2016-10-14 03:53 test_mixed_case/partcol=4 drwxr-xr-x - msa msa 0 2016-10-14 03:53 test_mixed_case/partcol=5 drwxr-xr-x - msa msa 0 2016-10-14 03:53 test_mixed_case/partcol=6 drwxr-xr-x - msa msa 0 2016-10-14 03:53 test_mixed_case/partcol=7 drwxr-xr-x - msa msa 0 2016-10-14 03:53 test_mixed_case/partcol=8 drwxr-xr-x - msa msa 0 2016-10-14 03:53 test_mixed_case/partcol=9 ``` I'll file an issue when I have a moment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r83352979 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -626,6 +627,40 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat client.getPartition(db, table, spec) } + override def listPartitionsByFilter( --- End diff -- We probably could, but why? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r83352838 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -626,6 +627,40 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat client.getPartition(db, table, spec) } + override def listPartitionsByFilter( --- End diff -- Why can't we just call `client.getPartitionsByFilter` in this method and put all other logic at caller side? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r83349072 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala --- @@ -34,7 +34,7 @@ import org.apache.spark.util.Utils // The data where the partitioning key exists only in the directory structure. case class ParquetData(intField: Int, stringField: String) // The data that also includes the partitioning key -case class ParquetDataWithKey(p: Int, intField: Int, stringField: String) +case class ParquetDataWithKey(pQ: Int, intField: Int, stringField: String) --- End diff -- Ok. I'll report an issue if I find a problem. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r83348167 --- Diff: core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala --- @@ -60,3 +60,32 @@ object CodegenMetrics extends Source { val METRIC_GENERATED_METHOD_BYTECODE_SIZE = metricRegistry.histogram(MetricRegistry.name("generatedMethodSize")) } + +/** + * :: Experimental :: + * Metrics for access to the hive external catalog. + */ +@Experimental +object HiveCatalogMetrics extends Source { + override val sourceName: String = "HiveExternalCatalog" + override val metricRegistry: MetricRegistry = new MetricRegistry() + + /** + * Tracks the total number of partition metadata entries fetched via the client api. + */ + val METRIC_PARTITIONS_FETCHED = metricRegistry.counter(MetricRegistry.name("partitionsFetched")) + + /** + * Tracks the total number of files discovered off of the filesystem by ListingFileCatalog. + */ + val METRIC_FILES_DISCOVERED = metricRegistry.counter(MetricRegistry.name("filesDiscovered")) + + def reset(): Unit = { +METRIC_PARTITIONS_FETCHED.dec(METRIC_PARTITIONS_FETCHED.getCount()) +METRIC_FILES_DISCOVERED.dec(METRIC_FILES_DISCOVERED.getCount()) + } + + // clients can use these to avoid classloader issues with the codahale classes --- End diff -- I don't quite understand the issue, but if you reference the Counter object directly from the caller sites then you get ``` [info] Exception encountered when attempting to run a suite with class name: org.apache.spark.sql.hive.HiveDataFrameSuite *** ABORTED *** (12 seconds, 51 milliseconds) [info] java.lang.LinkageError: loader constraint violation: loader (instance of org/apache/spark/sql/hive/client/IsolatedClientLoader$$anon$1) previously initiated loading for a different type with name "com/codahale/metrics/Counter" [info] at java.lang.ClassLoader.defineClass1(Native Method) [info] at java.lang.ClassLoader.defineClass(ClassLoader.java:763) [info] at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) [info] at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r83348200 --- Diff: core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala --- @@ -60,3 +60,32 @@ object CodegenMetrics extends Source { val METRIC_GENERATED_METHOD_BYTECODE_SIZE = metricRegistry.histogram(MetricRegistry.name("generatedMethodSize")) } + +/** + * :: Experimental :: + * Metrics for access to the hive external catalog. + */ +@Experimental +object HiveCatalogMetrics extends Source { + override val sourceName: String = "HiveExternalCatalog" + override val metricRegistry: MetricRegistry = new MetricRegistry() + + /** + * Tracks the total number of partition metadata entries fetched via the client api. + */ + val METRIC_PARTITIONS_FETCHED = metricRegistry.counter(MetricRegistry.name("partitionsFetched")) + + /** + * Tracks the total number of files discovered off of the filesystem by ListingFileCatalog. + */ + val METRIC_FILES_DISCOVERED = metricRegistry.counter(MetricRegistry.name("filesDiscovered")) + + def reset(): Unit = { +METRIC_PARTITIONS_FETCHED.dec(METRIC_PARTITIONS_FETCHED.getCount()) +METRIC_FILES_DISCOVERED.dec(METRIC_FILES_DISCOVERED.getCount()) + } + + // clients can use these to avoid classloader issues with the codahale classes --- End diff -- Hm, maybe this is a load order issue --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r83348022 --- Diff: core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala --- @@ -60,3 +60,32 @@ object CodegenMetrics extends Source { val METRIC_GENERATED_METHOD_BYTECODE_SIZE = metricRegistry.histogram(MetricRegistry.name("generatedMethodSize")) } + +/** + * :: Experimental :: + * Metrics for access to the hive external catalog. + */ +@Experimental +object HiveCatalogMetrics extends Source { + override val sourceName: String = "HiveExternalCatalog" + override val metricRegistry: MetricRegistry = new MetricRegistry() + + /** + * Tracks the total number of partition metadata entries fetched via the client api. + */ + val METRIC_PARTITIONS_FETCHED = metricRegistry.counter(MetricRegistry.name("partitionsFetched")) + + /** + * Tracks the total number of files discovered off of the filesystem by ListingFileCatalog. + */ + val METRIC_FILES_DISCOVERED = metricRegistry.counter(MetricRegistry.name("filesDiscovered")) + + def reset(): Unit = { +METRIC_PARTITIONS_FETCHED.dec(METRIC_PARTITIONS_FETCHED.getCount()) +METRIC_FILES_DISCOVERED.dec(METRIC_FILES_DISCOVERED.getCount()) + } + + // clients can use these to avoid classloader issues with the codahale classes --- End diff -- I don't quite understand this comment. what issue do this 2 method address? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r83347741 --- Diff: core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala --- @@ -60,3 +60,32 @@ object CodegenMetrics extends Source { val METRIC_GENERATED_METHOD_BYTECODE_SIZE = metricRegistry.histogram(MetricRegistry.name("generatedMethodSize")) } + +/** + * :: Experimental :: + * Metrics for access to the hive external catalog. + */ +@Experimental +object HiveCatalogMetrics extends Source { --- End diff -- Ah, the reason you can't is because to register this source it needs to be in the list above. This made me realize I forgot to add it to the list actually: https://github.com/VideoAmp/spark-public/pull/6 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r83347636 --- Diff: core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala --- @@ -60,3 +60,32 @@ object CodegenMetrics extends Source { val METRIC_GENERATED_METHOD_BYTECODE_SIZE = metricRegistry.histogram(MetricRegistry.name("generatedMethodSize")) } + +/** + * :: Experimental :: + * Metrics for access to the hive external catalog. + */ +@Experimental +object HiveCatalogMetrics extends Source { + override val sourceName: String = "HiveExternalCatalog" + override val metricRegistry: MetricRegistry = new MetricRegistry() + + /** + * Tracks the total number of partition metadata entries fetched via the client api. + */ + val METRIC_PARTITIONS_FETCHED = metricRegistry.counter(MetricRegistry.name("partitionsFetched")) + + /** + * Tracks the total number of files discovered off of the filesystem by ListingFileCatalog. + */ + val METRIC_FILES_DISCOVERED = metricRegistry.counter(MetricRegistry.name("filesDiscovered")) + + def reset(): Unit = { --- End diff -- should we mention that this is for testing only? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r83347251 --- Diff: core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala --- @@ -60,3 +60,32 @@ object CodegenMetrics extends Source { val METRIC_GENERATED_METHOD_BYTECODE_SIZE = metricRegistry.histogram(MetricRegistry.name("generatedMethodSize")) } + +/** + * :: Experimental :: + * Metrics for access to the hive external catalog. + */ +@Experimental +object HiveCatalogMetrics extends Source { --- End diff -- should we move it to sql module? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r83342839 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala --- @@ -34,7 +34,7 @@ import org.apache.spark.util.Utils // The data where the partitioning key exists only in the directory structure. case class ParquetData(intField: Int, stringField: String) // The data that also includes the partitioning key -case class ParquetDataWithKey(p: Int, intField: Int, stringField: String) +case class ParquetDataWithKey(pQ: Int, intField: Int, stringField: String) --- End diff -- Can you revert these changes? I think we are close to something mergeable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r83341181 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -616,6 +617,44 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat client.getPartition(db, table, spec) } + override def listPartitionsByFilter( + db: String, + table: String, + predicates: Seq[Expression]): Seq[CatalogTablePartition] = withClient { +val catalogTable = client.getTable(db, table) +val partitionColumnNames = catalogTable.partitionColumnNames.toSet +val nonPartitionPruningPredicates = predicates.filterNot { + _.references.map(_.name).toSet.subsetOf(partitionColumnNames) +} + +if (nonPartitionPruningPredicates.nonEmpty) { +sys.error("Expected only partition pruning predicates: " + + predicates.reduceLeft(And)) +} + +val partitionSchema = catalogTable.partitionSchema + +if (predicates.nonEmpty) { + val clientPrunedPartitions = +client.getPartitionsByFilter(catalogTable, predicates) + val boundPredicate = +InterpretedPredicate.create(predicates.reduce(And).transform { + case att: AttributeReference => +val index = partitionSchema.indexWhere(_.name == att.name) --- End diff -- I'm on linux, which should be case-sensitive? Were you able to reproduce with the snippet I tried locally? Anyways, we should probably address this issue in another ticket. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r83340821 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -2602,7 +2602,7 @@ class Dataset[T] private[sql]( * @since 2.0.0 */ def inputFiles: Array[String] = { -val files: Seq[String] = logicalPlan.collect { +val files: Seq[String] = queryExecution.optimizedPlan.collect { --- End diff -- We only determine the partitions read after optimization, so it's necessary to read it from that instead of the logical plan. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r83340736 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -2602,7 +2602,7 @@ class Dataset[T] private[sql]( * @since 2.0.0 */ def inputFiles: Array[String] = { -val files: Seq[String] = logicalPlan.collect { +val files: Seq[String] = queryExecution.optimizedPlan.collect { --- End diff -- why this change? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r83340504 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala --- @@ -477,6 +478,15 @@ class InMemoryCatalog( catalog(db).tables(table).partitions.values.toSeq } + override def listPartitionsByFilter( + db: String, + table: String, + predicates: Seq[Expression]): Seq[CatalogTablePartition] = { +// TODO: Provide an implementation +throw new UnsupportedOperationException( + "listPartitionsByFilter is not implemented") --- End diff -- because not all versions of hive metastore support partition pruning, so in our code we expect `listPartitionsByFilter` may still return all partitions. But anyway, throwing exception here is least surprising. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r83325529 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -225,13 +225,19 @@ case class FileSourceScanExec( } // These metadata values make scan plans uniquely identifiable for equality checking. - override val metadata: Map[String, String] = Map( -"Format" -> relation.fileFormat.toString, -"ReadSchema" -> outputSchema.catalogString, -"Batched" -> supportsBatch.toString, -"PartitionFilters" -> partitionFilters.mkString("[", ", ", "]"), -"PushedFilters" -> dataFilters.mkString("[", ", ", "]"), -"InputPaths" -> relation.location.paths.mkString(", ")) + override val metadata: Map[String, String] = { +def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]") +val location = relation.location +val locationDesc = + location.getClass.getSimpleName + seqToString(location.rootPaths) --- End diff -- This style emulates the way relations are shown in a query plan, e.g. `Relation[a#1, b#2]` I'm in favor of keeping this as-is. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r83325088 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala --- @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule + +private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { +case op @ PhysicalOperation(projects, filters, +logicalRelation @ + LogicalRelation(fsRelation @ +HadoopFsRelation( + tableFileCatalog: TableFileCatalog, + partitionSchema, + _, + _, + _, + _), +_, +_)) +if filters.nonEmpty && fsRelation.partitionSchemaOption.isDefined => + // The attribute name of predicate could be different than the one in schema in case of + // case insensitive, we should change them to match the one in schema, so we donot need to + // worry about case sensitivity anymore. + val normalizedFilters = filters.map { e => +e transform { + case a: AttributeReference => + a.withName(logicalRelation.output.find(_.semanticEquals(a)).get.name) +} + } + + val sparkSession = fsRelation.sparkSession + val partitionColumns = +logicalRelation.resolve( + partitionSchema, sparkSession.sessionState.analyzer.resolver) + val partitionSet = AttributeSet(partitionColumns) + val partitionKeyFilters = + ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet))) + + if (partitionKeyFilters.nonEmpty) { + val prunedFileCatalog = tableFileCatalog.filterPartitions(partitionKeyFilters.toSeq) + val prunedFsRelation = +fsRelation.copy(location = prunedFileCatalog)(sparkSession) + val prunedLogicalRelation = logicalRelation.copy(relation = prunedFsRelation) + + // Keep partition-pruning predicates so that they are visible in physical planning + val filterExpression = filters.reduceLeft(And) --- End diff -- I pushed a commit to show partition count for partitioned tables. I also did some informal A/B perf testing keeping and omitting the partition pruning filters. I saw no discernible performance difference. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r83318096 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -616,6 +617,44 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat client.getPartition(db, table, spec) } + override def listPartitionsByFilter( + db: String, + table: String, + predicates: Seq[Expression]): Seq[CatalogTablePartition] = withClient { +val catalogTable = client.getTable(db, table) +val partitionColumnNames = catalogTable.partitionColumnNames.toSet +val nonPartitionPruningPredicates = predicates.filterNot { + _.references.map(_.name).toSet.subsetOf(partitionColumnNames) +} + +if (nonPartitionPruningPredicates.nonEmpty) { +sys.error("Expected only partition pruning predicates: " + + predicates.reduceLeft(And)) +} + +val partitionSchema = catalogTable.partitionSchema + +if (predicates.nonEmpty) { + val clientPrunedPartitions = +client.getPartitionsByFilter(catalogTable, predicates) + val boundPredicate = +InterpretedPredicate.create(predicates.reduce(And).transform { + case att: AttributeReference => +val index = partitionSchema.indexWhere(_.name == att.name) --- End diff -- > I think there is an issue in the modified test, since the ParquetDataWithKey class still has single-p as a field. That would cause false positive failures. > > I tried locally to reproduce a mixed-case issue, with no success, e.g. BTW, are you testing with a case-sensitive filesystem? My revised `ParquetMetastoreSuite` tests passed for me on a case-insensitive filesystem. I switched to a case-sensitive filesystem and they fail. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r83302513 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -230,13 +230,21 @@ case class FileSourceScanExec( val location = relation.location val locationDesc = location.getClass.getSimpleName + seqToString(location.rootPaths) -Map( - "Format" -> relation.fileFormat.toString, - "ReadSchema" -> outputSchema.catalogString, - "Batched" -> supportsBatch.toString, - "PartitionFilters" -> seqToString(partitionFilters), - "PushedFilters" -> seqToString(dataFilters), - "Location" -> locationDesc) +val metadata = + Map( +"Format" -> relation.fileFormat.toString, +"ReadSchema" -> outputSchema.catalogString, +"Batched" -> supportsBatch.toString, +"PartitionFilters" -> seqToString(partitionFilters), +"PushedFilters" -> seqToString(dataFilters), +"Location" -> locationDesc) +val withOptPartitionCount = + relation.partitionSchemaOption.map { _ => +metadata + ("PartitionCount" -> selectedPartitions.size.toString) --- End diff -- +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r83141382 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -616,6 +617,44 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat client.getPartition(db, table, spec) } + override def listPartitionsByFilter( + db: String, + table: String, + predicates: Seq[Expression]): Seq[CatalogTablePartition] = withClient { +val catalogTable = client.getTable(db, table) +val partitionColumnNames = catalogTable.partitionColumnNames.toSet +val nonPartitionPruningPredicates = predicates.filterNot { + _.references.map(_.name).toSet.subsetOf(partitionColumnNames) +} + +if (nonPartitionPruningPredicates.nonEmpty) { +sys.error("Expected only partition pruning predicates: " + + predicates.reduceLeft(And)) +} + +val partitionSchema = catalogTable.partitionSchema + +if (predicates.nonEmpty) { + val clientPrunedPartitions = +client.getPartitionsByFilter(catalogTable, predicates) + val boundPredicate = +InterpretedPredicate.create(predicates.reduce(And).transform { + case att: AttributeReference => +val index = partitionSchema.indexWhere(_.name == att.name) --- End diff -- I tested this with unit tests from two test suites on two branches. The first test suite was `SQLQuerySuite` from the Hive codebase, specifically the test "SPARK-10562: partition by column with mixed case name". The second test suite was (a modified) `ParquetMetastoreSuite`. I modified the name of the partition column in the partitioned tables in the latter suite from `p` to `pQ`. The two branches on which I tested were this PR and commit 8d33e1e from the master branch. The first test suite passed on both branches. I guess that's to be expected since our Jenkins bot has been reporting it as passed. The second suite failed (as modified) on both branches. In both branches, Spark SQL failed to find the partitions on-disk. This makes me wonder: 1. Is this a known/accepted limitation? 1. If unknown, is this an acceptable limitation or a bug to be fixed? The best I found regarding support for mixed-case partition columns was in https://issues.apache.org/jira/browse/SPARK-10562. Unlike in the first test (which uses the `saveAsTable` method), the tables in `ParquetMetastoreSuite` are built with SQL DDL. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r83131630 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala --- @@ -199,59 +197,30 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log val result = if (metastoreRelation.hiveQlTable.isPartitioned) { val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys) - val partitionColumnDataTypes = partitionSchema.map(_.dataType) - // We're converting the entire table into HadoopFsRelation, so predicates to Hive metastore - // are empty. - val partitions = metastoreRelation.getHiveQlPartitions().map { p => -val location = p.getLocation -val values = InternalRow.fromSeq(p.getValues.asScala.zip(partitionColumnDataTypes).map { - case (rawValue, dataType) => Cast(Literal(rawValue), dataType).eval(null) -}) -PartitionDirectory(values, location) - } - val partitionSpec = PartitionSpec(partitionSchema, partitions) --- End diff -- Ok, thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r83128973 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala --- @@ -199,59 +197,30 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log val result = if (metastoreRelation.hiveQlTable.isPartitioned) { val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys) - val partitionColumnDataTypes = partitionSchema.map(_.dataType) - // We're converting the entire table into HadoopFsRelation, so predicates to Hive metastore - // are empty. - val partitions = metastoreRelation.getHiveQlPartitions().map { p => -val location = p.getLocation -val values = InternalRow.fromSeq(p.getValues.asScala.zip(partitionColumnDataTypes).map { - case (rawValue, dataType) => Cast(Literal(rawValue), dataType).eval(null) -}) -PartitionDirectory(values, location) - } - val partitionSpec = PartitionSpec(partitionSchema, partitions) --- End diff -- Nope, that should be good to go. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r83115827 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala --- @@ -199,59 +197,30 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log val result = if (metastoreRelation.hiveQlTable.isPartitioned) { val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys) - val partitionColumnDataTypes = partitionSchema.map(_.dataType) - // We're converting the entire table into HadoopFsRelation, so predicates to Hive metastore - // are empty. - val partitions = metastoreRelation.getHiveQlPartitions().map { p => -val location = p.getLocation -val values = InternalRow.fromSeq(p.getValues.asScala.zip(partitionColumnDataTypes).map { - case (rawValue, dataType) => Cast(Literal(rawValue), dataType).eval(null) -}) -PartitionDirectory(values, location) - } - val partitionSpec = PartitionSpec(partitionSchema, partitions) --- End diff -- Does this invalidate any of the work you've done in https://github.com/VideoAmp/spark-public/pull/3? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r83106495 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala --- @@ -55,10 +55,16 @@ class TableFileCatalog( override def listFiles(filters: Seq[Expression]): Seq[Partition] = partitionSchema match { case Some(partitionSchema) => - externalCatalog.listPartitionsByFilter(db, table, filters).flatMap { + val catalogTablePartitions = externalCatalog.listPartitionsByFilter(db, table, filters) --- End diff -- Actually, this breaks inputFiles as well when there are mixed partition paths as in "SPARK-15248: explicitly added partitions should be readable". I'm going to look into this more. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r83086945 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala --- @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule + +private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { +case op @ PhysicalOperation(projects, filters, +logicalRelation @ + LogicalRelation(fsRelation @ +HadoopFsRelation( + tableFileCatalog: TableFileCatalog, + partitionSchema, + _, + _, + _, + _), +_, +_)) +if filters.nonEmpty && fsRelation.partitionSchemaOption.isDefined => + // The attribute name of predicate could be different than the one in schema in case of + // case insensitive, we should change them to match the one in schema, so we donot need to + // worry about case sensitivity anymore. + val normalizedFilters = filters.map { e => +e transform { + case a: AttributeReference => + a.withName(logicalRelation.output.find(_.semanticEquals(a)).get.name) +} + } + + val sparkSession = fsRelation.sparkSession + val partitionColumns = +logicalRelation.resolve( + partitionSchema, sparkSession.sessionState.analyzer.resolver) + val partitionSet = AttributeSet(partitionColumns) + val partitionKeyFilters = + ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet))) + + if (partitionKeyFilters.nonEmpty) { + val prunedFileCatalog = tableFileCatalog.filterPartitions(partitionKeyFilters.toSeq) + val prunedFsRelation = +fsRelation.copy(location = prunedFileCatalog)(sparkSession) + val prunedLogicalRelation = logicalRelation.copy(relation = prunedFsRelation) + + // Keep partition-pruning predicates so that they are visible in physical planning + val filterExpression = filters.reduceLeft(And) --- End diff -- Also, I'll try some A/B testing to examine the performance impact of removing these partition pruning filters here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r83085289 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -616,6 +617,44 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat client.getPartition(db, table, spec) } + override def listPartitionsByFilter( + db: String, + table: String, + predicates: Seq[Expression]): Seq[CatalogTablePartition] = withClient { +val catalogTable = client.getTable(db, table) +val partitionColumnNames = catalogTable.partitionColumnNames.toSet +val nonPartitionPruningPredicates = predicates.filterNot { + _.references.map(_.name).toSet.subsetOf(partitionColumnNames) +} + +if (nonPartitionPruningPredicates.nonEmpty) { +sys.error("Expected only partition pruning predicates: " + + predicates.reduceLeft(And)) +} + +val partitionSchema = catalogTable.partitionSchema + +if (predicates.nonEmpty) { + val clientPrunedPartitions = +client.getPartitionsByFilter(catalogTable, predicates) + val boundPredicate = +InterpretedPredicate.create(predicates.reduce(And).transform { + case att: AttributeReference => +val index = partitionSchema.indexWhere(_.name == att.name) --- End diff -- I'll test this and report back. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r83085625 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.hadoop.fs.Path + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types.{StructField, StructType} + + +/** + * A [[BasicFileCatalog]] for a metastore catalog table. + * + * @param sparkSession a [[SparkSession]] + * @param db the table's database name + * @param table the table's (unqualified) name + * @param partitionSchema the schema of a partitioned table's partition columns + * @param sizeInBytes the table's data size in bytes + */ +class TableFileCatalog( +sparkSession: SparkSession, +db: String, +table: String, +partitionSchema: Option[StructType], +override val sizeInBytes: Long) + extends SessionFileCatalog(sparkSession) { + + override protected val hadoopConf = sparkSession.sessionState.newHadoopConf + + private val externalCatalog = sparkSession.sharedState.externalCatalog + + private val catalogTable = externalCatalog.getTable(db, table) + + private val baseLocation = catalogTable.storage.locationUri + + override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq + + override def listFiles(filters: Seq[Expression]): Seq[Partition] = partitionSchema match { +case Some(partitionSchema) => + externalCatalog.listPartitionsByFilter(db, table, filters).flatMap { +case CatalogTablePartition(spec, storage, _) => + storage.locationUri.map(new Path(_)).map { path => +val files = listDataLeafFiles(path :: Nil).toSeq --- End diff -- I very recently pushed a commit that refactors this to parallelize it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r83078271 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala --- @@ -55,10 +55,16 @@ class TableFileCatalog( override def listFiles(filters: Seq[Expression]): Seq[Partition] = partitionSchema match { case Some(partitionSchema) => - externalCatalog.listPartitionsByFilter(db, table, filters).flatMap { + val catalogTablePartitions = externalCatalog.listPartitionsByFilter(db, table, filters) --- End diff -- Nevermind, this fails since we try to infer partitioning there. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r83065199 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -616,6 +617,44 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat client.getPartition(db, table, spec) } + override def listPartitionsByFilter( + db: String, + table: String, + predicates: Seq[Expression]): Seq[CatalogTablePartition] = withClient { +val catalogTable = client.getTable(db, table) +val partitionColumnNames = catalogTable.partitionColumnNames.toSet +val nonPartitionPruningPredicates = predicates.filterNot { + _.references.map(_.name).toSet.subsetOf(partitionColumnNames) +} + +if (nonPartitionPruningPredicates.nonEmpty) { +sys.error("Expected only partition pruning predicates: " + + predicates.reduceLeft(And)) +} + +val partitionSchema = catalogTable.partitionSchema + +if (predicates.nonEmpty) { + val clientPrunedPartitions = +client.getPartitionsByFilter(catalogTable, predicates) + val boundPredicate = +InterpretedPredicate.create(predicates.reduce(And).transform { + case att: AttributeReference => +val index = partitionSchema.indexWhere(_.name == att.name) --- End diff -- Case sensitive? Or have we already make sure that they are in the same case somewhere? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r83071827 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala --- @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule + +private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { +case op @ PhysicalOperation(projects, filters, +logicalRelation @ + LogicalRelation(fsRelation @ +HadoopFsRelation( + tableFileCatalog: TableFileCatalog, --- End diff -- Should we put a TODO here (support other catalog)? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r83067223 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.hadoop.fs.Path + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types.{StructField, StructType} + + +/** + * A [[BasicFileCatalog]] for a metastore catalog table. + * + * @param sparkSession a [[SparkSession]] + * @param db the table's database name + * @param table the table's (unqualified) name + * @param partitionSchema the schema of a partitioned table's partition columns + * @param sizeInBytes the table's data size in bytes + */ +class TableFileCatalog( +sparkSession: SparkSession, +db: String, +table: String, +partitionSchema: Option[StructType], +override val sizeInBytes: Long) + extends SessionFileCatalog(sparkSession) { + + override protected val hadoopConf = sparkSession.sessionState.newHadoopConf + + private val externalCatalog = sparkSession.sharedState.externalCatalog + + private val catalogTable = externalCatalog.getTable(db, table) + + private val baseLocation = catalogTable.storage.locationUri + + override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq + + override def listFiles(filters: Seq[Expression]): Seq[Partition] = partitionSchema match { +case Some(partitionSchema) => + externalCatalog.listPartitionsByFilter(db, table, filters).flatMap { +case CatalogTablePartition(spec, storage, _) => + storage.locationUri.map(new Path(_)).map { path => +val files = listDataLeafFiles(path :: Nil).toSeq --- End diff -- Should we do this in parallel? (Could be another PR) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r83065354 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -616,6 +617,44 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat client.getPartition(db, table, spec) } + override def listPartitionsByFilter( + db: String, + table: String, + predicates: Seq[Expression]): Seq[CatalogTablePartition] = withClient { +val catalogTable = client.getTable(db, table) +val partitionColumnNames = catalogTable.partitionColumnNames.toSet +val nonPartitionPruningPredicates = predicates.filterNot { + _.references.map(_.name).toSet.subsetOf(partitionColumnNames) +} + +if (nonPartitionPruningPredicates.nonEmpty) { +sys.error("Expected only partition pruning predicates: " + + predicates.reduceLeft(And)) +} + +val partitionSchema = catalogTable.partitionSchema + +if (predicates.nonEmpty) { + val clientPrunedPartitions = +client.getPartitionsByFilter(catalogTable, predicates) + val boundPredicate = +InterpretedPredicate.create(predicates.reduce(And).transform { + case att: AttributeReference => +val index = partitionSchema.indexWhere(_.name == att.name) +BoundReference(index, partitionSchema(index).dataType, nullable = true) +}) + clientPrunedPartitions.filter { case CatalogTablePartition(spec, _, _) => +val row = + InternalRow.fromSeq(partitionSchema.map { case StructField(name, dataType, _, _) => +Cast(Literal(spec(name)), dataType).eval() + }) +boundPredicate(row) --- End diff -- We should have a logging for false-positive from Hive client? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r83072226 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala --- @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule + +private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { +case op @ PhysicalOperation(projects, filters, +logicalRelation @ + LogicalRelation(fsRelation @ +HadoopFsRelation( + tableFileCatalog: TableFileCatalog, + partitionSchema, + _, + _, + _, + _), +_, +_)) +if filters.nonEmpty && fsRelation.partitionSchemaOption.isDefined => + // The attribute name of predicate could be different than the one in schema in case of + // case insensitive, we should change them to match the one in schema, so we donot need to + // worry about case sensitivity anymore. + val normalizedFilters = filters.map { e => +e transform { + case a: AttributeReference => + a.withName(logicalRelation.output.find(_.semanticEquals(a)).get.name) +} + } + + val sparkSession = fsRelation.sparkSession + val partitionColumns = +logicalRelation.resolve( + partitionSchema, sparkSession.sessionState.analyzer.resolver) + val partitionSet = AttributeSet(partitionColumns) + val partitionKeyFilters = + ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet))) + + if (partitionKeyFilters.nonEmpty) { + val prunedFileCatalog = tableFileCatalog.filterPartitions(partitionKeyFilters.toSeq) + val prunedFsRelation = +fsRelation.copy(location = prunedFileCatalog)(sparkSession) + val prunedLogicalRelation = logicalRelation.copy(relation = prunedFsRelation) + + // Keep partition-pruning predicates so that they are visible in physical planning + val filterExpression = filters.reduceLeft(And) --- End diff -- The the partitions/files are already filtered out, do we really keep these predicates? That also help performance. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r83071819 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala --- @@ -55,10 +55,16 @@ class TableFileCatalog( override def listFiles(filters: Seq[Expression]): Seq[Partition] = partitionSchema match { case Some(partitionSchema) => - externalCatalog.listPartitionsByFilter(db, table, filters).flatMap { + val catalogTablePartitions = externalCatalog.listPartitionsByFilter(db, table, filters) --- End diff -- Would filterPartitions(filters).listFiles(Nil) work here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r83061877 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -225,13 +225,19 @@ case class FileSourceScanExec( } // These metadata values make scan plans uniquely identifiable for equality checking. - override val metadata: Map[String, String] = Map( -"Format" -> relation.fileFormat.toString, -"ReadSchema" -> outputSchema.catalogString, -"Batched" -> supportsBatch.toString, -"PartitionFilters" -> partitionFilters.mkString("[", ", ", "]"), -"PushedFilters" -> dataFilters.mkString("[", ", ", "]"), -"InputPaths" -> relation.location.paths.mkString(", ")) + override val metadata: Map[String, String] = { +def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]") +val location = relation.location +val locationDesc = + location.getClass.getSimpleName + seqToString(location.rootPaths) --- End diff -- Should they be separated by space? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r83036315 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala --- @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule + +private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { +case op @ PhysicalOperation(projects, filters, +logicalRelation @ + LogicalRelation(fsRelation @ +HadoopFsRelation( + tableFileCatalog: TableFileCatalog, + partitionSchema, + _, + _, + _, + _), +_, +_)) +if filters.nonEmpty && fsRelation.partitionSchemaOption.isDefined => + // The attribute name of predicate could be different than the one in schema in case of + // case insensitive, we should change them to match the one in schema, so we donot need to + // worry about case sensitivity anymore. + val normalizedFilters = filters.map { e => --- End diff -- I tried this before. IIRC, the issue is that in a join query the `JoinSelection` rule runs before the `FileSourceStrategy` because it matches the logical plan first. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r82902172 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -225,13 +225,16 @@ case class FileSourceScanExec( } // These metadata values make scan plans uniquely identifiable for equality checking. - override val metadata: Map[String, String] = Map( -"Format" -> relation.fileFormat.toString, -"ReadSchema" -> outputSchema.catalogString, -"Batched" -> supportsBatch.toString, -"PartitionFilters" -> partitionFilters.mkString("[", ", ", "]"), -"PushedFilters" -> dataFilters.mkString("[", ", ", "]"), -"InputPaths" -> relation.location.paths.mkString(", ")) + override val metadata: Map[String, String] = { +def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]") +val location = relation.location +Map("Format" -> relation.fileFormat.toString, + "ReadSchema" -> outputSchema.catalogString, + "Batched" -> supportsBatch.toString, + "PartitionFilters" -> seqToString(partitionFilters), + "PushedFilters" -> seqToString(dataFilters), + "Location" -> (location.getClass.getSimpleName + seqToString(location.rootPaths))) --- End diff -- > Btw, you might want to take(10) on the root paths to avoid materializing a large string here before it gets truncated. Actually, since `metadata` is used in various places for query plan equivalence checking it seems we need to build the whole string, but with sorted paths: seqToString(location.rootPaths.sorted) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r82900810 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -225,13 +225,16 @@ case class FileSourceScanExec( } // These metadata values make scan plans uniquely identifiable for equality checking. - override val metadata: Map[String, String] = Map( -"Format" -> relation.fileFormat.toString, -"ReadSchema" -> outputSchema.catalogString, -"Batched" -> supportsBatch.toString, -"PartitionFilters" -> partitionFilters.mkString("[", ", ", "]"), -"PushedFilters" -> dataFilters.mkString("[", ", ", "]"), -"InputPaths" -> relation.location.paths.mkString(", ")) + override val metadata: Map[String, String] = { +def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]") +val location = relation.location +Map("Format" -> relation.fileFormat.toString, + "ReadSchema" -> outputSchema.catalogString, + "Batched" -> supportsBatch.toString, + "PartitionFilters" -> seqToString(partitionFilters), + "PushedFilters" -> seqToString(dataFilters), + "Location" -> (location.getClass.getSimpleName + seqToString(location.rootPaths))) --- End diff -- > Is it different from location.getRootPaths.length? I guess that would only be meaningful for Hive-backed tables, but that seems ok. This is true for relations that have been pruned by `PruneFileSourcePartitions`. I'm looking for a tidier solution. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r82875191 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -225,13 +225,16 @@ case class FileSourceScanExec( } // These metadata values make scan plans uniquely identifiable for equality checking. - override val metadata: Map[String, String] = Map( -"Format" -> relation.fileFormat.toString, -"ReadSchema" -> outputSchema.catalogString, -"Batched" -> supportsBatch.toString, -"PartitionFilters" -> partitionFilters.mkString("[", ", ", "]"), -"PushedFilters" -> dataFilters.mkString("[", ", ", "]"), -"InputPaths" -> relation.location.paths.mkString(", ")) + override val metadata: Map[String, String] = { +def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]") +val location = relation.location +Map("Format" -> relation.fileFormat.toString, + "ReadSchema" -> outputSchema.catalogString, + "Batched" -> supportsBatch.toString, + "PartitionFilters" -> seqToString(partitionFilters), + "PushedFilters" -> seqToString(dataFilters), + "Location" -> (location.getClass.getSimpleName + seqToString(location.rootPaths))) --- End diff -- Is it different from location.getRootPaths.length? I guess that would only be meaningful for Hive-backed tables, but that seems ok. Btw, you might want to take(10) on the root paths to avoid materializing a large string here before it gets truncated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r82850487 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -225,13 +225,16 @@ case class FileSourceScanExec( } // These metadata values make scan plans uniquely identifiable for equality checking. - override val metadata: Map[String, String] = Map( -"Format" -> relation.fileFormat.toString, -"ReadSchema" -> outputSchema.catalogString, -"Batched" -> supportsBatch.toString, -"PartitionFilters" -> partitionFilters.mkString("[", ", ", "]"), -"PushedFilters" -> dataFilters.mkString("[", ", ", "]"), -"InputPaths" -> relation.location.paths.mkString(", ")) + override val metadata: Map[String, String] = { +def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]") +val location = relation.location +Map("Format" -> relation.fileFormat.toString, + "ReadSchema" -> outputSchema.catalogString, + "Batched" -> supportsBatch.toString, + "PartitionFilters" -> seqToString(partitionFilters), + "PushedFilters" -> seqToString(dataFilters), + "Location" -> (location.getClass.getSimpleName + seqToString(location.rootPaths))) --- End diff -- I'm going to pull this value out to a local `val`. The reason I don't want to make this part of the `BasicFileCatalog` type is that this format seems specific to this class's `metadata` method. For example, we're using the method-local `seqToString` method to format the `rootPaths` list. Regarding the partition count, this is actually a little trickier than I expected. I'm going to expand on that in a standalone PR comment after the next push. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r82715947 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -225,13 +225,16 @@ case class FileSourceScanExec( } // These metadata values make scan plans uniquely identifiable for equality checking. - override val metadata: Map[String, String] = Map( -"Format" -> relation.fileFormat.toString, -"ReadSchema" -> outputSchema.catalogString, -"Batched" -> supportsBatch.toString, -"PartitionFilters" -> partitionFilters.mkString("[", ", ", "]"), -"PushedFilters" -> dataFilters.mkString("[", ", ", "]"), -"InputPaths" -> relation.location.paths.mkString(", ")) + override val metadata: Map[String, String] = { +def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]") +val location = relation.location +Map("Format" -> relation.fileFormat.toString, --- End diff -- Not sure, but the indentation is more consistent with a newline after `Map(`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r82715954 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala --- @@ -477,6 +478,15 @@ class InMemoryCatalog( catalog(db).tables(table).partitions.values.toSeq } + override def listPartitionsByFilter( + db: String, + table: String, + predicates: Seq[Expression]): Seq[CatalogTablePartition] = { +// TODO: Provide an implementation +throw new UnsupportedOperationException( + "listPartitionsByFilter is not implemented") --- End diff -- That's true. I guess this is fine for now; we can add back an implementation later when needed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r82713318 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -225,13 +225,16 @@ case class FileSourceScanExec( } // These metadata values make scan plans uniquely identifiable for equality checking. - override val metadata: Map[String, String] = Map( -"Format" -> relation.fileFormat.toString, -"ReadSchema" -> outputSchema.catalogString, -"Batched" -> supportsBatch.toString, -"PartitionFilters" -> partitionFilters.mkString("[", ", ", "]"), -"PushedFilters" -> dataFilters.mkString("[", ", ", "]"), -"InputPaths" -> relation.location.paths.mkString(", ")) + override val metadata: Map[String, String] = { +def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]") +val location = relation.location +Map("Format" -> relation.fileFormat.toString, + "ReadSchema" -> outputSchema.catalogString, + "Batched" -> supportsBatch.toString, + "PartitionFilters" -> seqToString(partitionFilters), + "PushedFilters" -> seqToString(dataFilters), + "Location" -> (location.getClass.getSimpleName + seqToString(location.rootPaths))) --- End diff -- I considered that. I agree this doesn't look very good. I'll turn this over in my head and push something better. Also, I'll add the partition count. Good idea there. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r82713068 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -225,13 +225,16 @@ case class FileSourceScanExec( } // These metadata values make scan plans uniquely identifiable for equality checking. - override val metadata: Map[String, String] = Map( -"Format" -> relation.fileFormat.toString, -"ReadSchema" -> outputSchema.catalogString, -"Batched" -> supportsBatch.toString, -"PartitionFilters" -> partitionFilters.mkString("[", ", ", "]"), -"PushedFilters" -> dataFilters.mkString("[", ", ", "]"), -"InputPaths" -> relation.location.paths.mkString(", ")) + override val metadata: Map[String, String] = { +def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]") +val location = relation.location +Map("Format" -> relation.fileFormat.toString, --- End diff -- I don't know. Does Spark have a guideline for this? (Also, I'm not entirely sure I know what you're referring to here.) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r82712965 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala --- @@ -477,6 +478,15 @@ class InMemoryCatalog( catalog(db).tables(table).partitions.values.toSeq } + override def listPartitionsByFilter( + db: String, + table: String, + predicates: Seq[Expression]): Seq[CatalogTablePartition] = { +// TODO: Provide an implementation +throw new UnsupportedOperationException( + "listPartitionsByFilter is not implemented") --- End diff -- It isn't? In the spirit of the principle of least surprise, my intention for this method was that it return exactly the partitions matching the given predicates. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r82708580 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala --- @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule + +private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { +case op @ PhysicalOperation(projects, filters, +logicalRelation @ + LogicalRelation(fsRelation @ +HadoopFsRelation( + tableFileCatalog: TableFileCatalog, + partitionSchema, + _, + _, + _, + _), +_, +_)) +if filters.nonEmpty && fsRelation.partitionSchemaOption.isDefined => + // The attribute name of predicate could be different than the one in schema in case of + // case insensitive, we should change them to match the one in schema, so we donot need to + // worry about case sensitivity anymore. + val normalizedFilters = filters.map { e => --- End diff -- Could we de-duplicate this code from that in FileSourceStrategy? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r82708474 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalog.scala --- @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.io.FileNotFoundException + +import scala.collection.mutable + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path} +import org.apache.hadoop.mapred.{FileInputFormat, JobConf} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession + + +/** + * A base class for [[BasicFileCatalog]]s that need a [[SparkSession]] and the ability to find leaf + * files in a list of HDFS paths. + * + * @param sparkSession a [[SparkSession]] + * @param ignoreFileNotFound (see [[ListingFileCatalog]]) + */ +abstract class SessionFileCatalog(sparkSession: SparkSession) --- End diff -- Note to self: moved verbatim --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r82709034 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -619,6 +620,44 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat client.getPartition(db, table, spec) } + override def listPartitionsByFilter( + db: String, + table: String, + predicates: Seq[Expression]): Seq[CatalogTablePartition] = withClient { +val catalogTable = client.getTable(db, table) +val partitionColumnNames = catalogTable.partitionColumnNames.toSet +val nonPartitionPruningPredicates = predicates.filterNot { + _.references.map(_.name).toSet.subsetOf(partitionColumnNames) +} + +if (nonPartitionPruningPredicates.nonEmpty) { +sys.error("Expected only partition pruning predicates: " + + predicates.reduceLeft(And)) +} + +val partitionSchema = catalogTable.partitionSchema + +if (predicates.nonEmpty) { + val clientPrunedPartitions = +client.getPartitionsByFilter(catalogTable, predicates) + val boundPredicate = +InterpretedPredicate.create(predicates.reduce(And).transform { + case att: AttributeReference => +val index = partitionSchema.indexWhere(_.name == att.name) +BoundReference(index, partitionSchema(index).dataType, nullable = true) +}) + clientPrunedPartitions.filter { case CatalogTablePartition(spec, _, _) => +val row = + InternalRow.fromSeq(partitionSchema.map { case StructField(name, dataType, _, _) => +Cast(Literal(spec(name)), dataType).eval() + }) +boundPredicate(row) + } +} else { --- End diff -- Any way to de-dup the pruning logic from PartitioningAwareFileCatalog? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r82707347 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala --- @@ -477,6 +478,15 @@ class InMemoryCatalog( catalog(db).tables(table).partitions.values.toSeq } + override def listPartitionsByFilter( + db: String, + table: String, + predicates: Seq[Expression]): Seq[CatalogTablePartition] = { +// TODO: Provide an implementation +throw new UnsupportedOperationException( + "listPartitionsByFilter is not implemented") --- End diff -- I think you can just return all the partitions, since it's not required for correctness. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r82707709 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -225,13 +225,16 @@ case class FileSourceScanExec( } // These metadata values make scan plans uniquely identifiable for equality checking. - override val metadata: Map[String, String] = Map( -"Format" -> relation.fileFormat.toString, -"ReadSchema" -> outputSchema.catalogString, -"Batched" -> supportsBatch.toString, -"PartitionFilters" -> partitionFilters.mkString("[", ", ", "]"), -"PushedFilters" -> dataFilters.mkString("[", ", ", "]"), -"InputPaths" -> relation.location.paths.mkString(", ")) + override val metadata: Map[String, String] = { +def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]") +val location = relation.location +Map("Format" -> relation.fileFormat.toString, + "ReadSchema" -> outputSchema.catalogString, + "Batched" -> supportsBatch.toString, + "PartitionFilters" -> seqToString(partitionFilters), + "PushedFilters" -> seqToString(dataFilters), + "Location" -> (location.getClass.getSimpleName + seqToString(location.rootPaths))) --- End diff -- Should this be a method of location? Btw, including the number of partitions would be nice too, since it gets truncated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r82707684 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -225,13 +225,16 @@ case class FileSourceScanExec( } // These metadata values make scan plans uniquely identifiable for equality checking. - override val metadata: Map[String, String] = Map( -"Format" -> relation.fileFormat.toString, -"ReadSchema" -> outputSchema.catalogString, -"Batched" -> supportsBatch.toString, -"PartitionFilters" -> partitionFilters.mkString("[", ", ", "]"), -"PushedFilters" -> dataFilters.mkString("[", ", ", "]"), -"InputPaths" -> relation.location.paths.mkString(", ")) + override val metadata: Map[String, String] = { +def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]") +val location = relation.location +Map("Format" -> relation.fileFormat.toString, --- End diff -- nit: should be on next line as well? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r77428539 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala --- @@ -346,11 +340,30 @@ trait FileCatalog { */ def listFiles(filters: Seq[Expression]): Seq[Partition] + /** Refresh any cached file listings */ + def refresh(): Unit + + /** Sum of table file sizes, in bytes */ + def sizeInBytes: Long +} + +/** + * A [[BasicFileCatalog]] which can enumerate all of the files comprising a relation and, from + * those, infer the relation's partition specification. + */ +trait FileCatalog extends BasicFileCatalog { --- End diff -- Makes sense. I feel like the naming here is a little weird but maybe it's best to clean this up in a separate PR to avoid too many changes here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r77428351 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala --- @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.hadoop.fs.Path + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types.{StructField, StructType} + + +/** + * A [[BasicFileCatalog]] for a metastore catalog table. + * + * @param sparkSession a [[SparkSession]] + * @param db the table's database name + * @param table the table's (unqualified) name + * @param partitionSchema the schema of a partitioned table's partition columns + * @param sizeInBytes the table's data size in bytes + */ +class TableFileCatalog( +sparkSession: SparkSession, +db: String, +table: String, +partitionSchema: Option[StructType], +override val sizeInBytes: Long) + extends SessionFileCatalog(sparkSession) { + + override protected val hadoopConf = sparkSession.sessionState.newHadoopConf + + private val externalCatalog = sparkSession.sharedState.externalCatalog + + private val catalogTable = externalCatalog.getTable(db, table) + + private val baseLocation = catalogTable.storage.locationUri + + override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq + + override def listFiles(filters: Seq[Expression]): Seq[Partition] = partitionSchema match { +case Some(partitionSchema) => + externalCatalog.listPartitionsByFilter(db, table, filters).flatMap { +case CatalogTablePartition(spec, storage, _) => + storage.locationUri.map(new Path(_)).map { path => +val files = listDataLeafFiles(path :: Nil).toSeq +val values = + InternalRow.fromSeq(partitionSchema.map { case StructField(name, dataType, _, _) => +Cast(Literal(spec(name)), dataType).eval() + }) +Partition(values, files) + } + } +case None => + Partition(InternalRow.empty, listDataLeafFiles(rootPaths).toSeq) :: Nil + } + + override def refresh(): Unit = {} + + + /** + * Returns a [[ListingFileCatalog]] for this table restricted to the subset of partitions + * specified by the given partition-pruning filters. + * + * @param filters partition-pruning filters + */ + def filterPartitions(filters: Seq[Expression]): ListingFileCatalog = { --- End diff -- > To be more specific, this is particularly helpful in validating correct pruning behavior in left and right outer joins. Sure, an expert will understand which sides' partition pruning filters need to go in the "join" clause and which in the "where" clause. This is not the case with less experienced users and even experienced users can forget which goes where and when. Btw I think you could look at pushed partition filters in the physical scan node to determine if pruning is happening as expected. > Yes, have a broadcast hint. The problem we've had with that is that we haven't found a way to apply that in SQL queries. (Is there a syntax in the parser for that?) There's a pr out for this (not yet merged) https://github.com/apache/spark/pull/14426 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- -
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r77426981 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -2531,6 +2531,8 @@ class Dataset[T] private[sql]( */ def inputFiles: Array[String] = { val files: Seq[String] = logicalPlan.collect { + case LogicalRelation(HadoopFsRelation(_, location: FileCatalog, _, _, _, _, _), _, _) => --- End diff -- Good question. It seems a call to `Dataset.inputFiles` which reads from a partitioned table will return an empty array here. On the other hand, asking for the input files for a large partitioned table will require a costly tree traversal. I'll have to think about this some more before I have an opinion. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r77426759 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala --- @@ -346,11 +340,30 @@ trait FileCatalog { */ def listFiles(filters: Seq[Expression]): Seq[Partition] + /** Refresh any cached file listings */ + def refresh(): Unit + + /** Sum of table file sizes, in bytes */ + def sizeInBytes: Long +} + +/** + * A [[BasicFileCatalog]] which can enumerate all of the files comprising a relation and, from + * those, infer the relation's partition specification. + */ +trait FileCatalog extends BasicFileCatalog { --- End diff -- Yes, I want to avoid/discourage file traversals of partitions which aren't part of a user's query. Moving `allFiles()` out of the root of the file catalog hierarchy let's us omit it from `TableFileCatalog`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r77426633 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala --- @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.hadoop.fs.Path + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types.{StructField, StructType} + + +/** + * A [[BasicFileCatalog]] for a metastore catalog table. + * + * @param sparkSession a [[SparkSession]] + * @param db the table's database name + * @param table the table's (unqualified) name + * @param partitionSchema the schema of a partitioned table's partition columns + * @param sizeInBytes the table's data size in bytes + */ +class TableFileCatalog( +sparkSession: SparkSession, +db: String, +table: String, +partitionSchema: Option[StructType], +override val sizeInBytes: Long) + extends SessionFileCatalog(sparkSession) { + + override protected val hadoopConf = sparkSession.sessionState.newHadoopConf + + private val externalCatalog = sparkSession.sharedState.externalCatalog + + private val catalogTable = externalCatalog.getTable(db, table) + + private val baseLocation = catalogTable.storage.locationUri + + override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq + + override def listFiles(filters: Seq[Expression]): Seq[Partition] = partitionSchema match { +case Some(partitionSchema) => + externalCatalog.listPartitionsByFilter(db, table, filters).flatMap { +case CatalogTablePartition(spec, storage, _) => + storage.locationUri.map(new Path(_)).map { path => +val files = listDataLeafFiles(path :: Nil).toSeq +val values = + InternalRow.fromSeq(partitionSchema.map { case StructField(name, dataType, _, _) => +Cast(Literal(spec(name)), dataType).eval() + }) +Partition(values, files) + } + } +case None => + Partition(InternalRow.empty, listDataLeafFiles(rootPaths).toSeq) :: Nil + } + + override def refresh(): Unit = {} + + + /** + * Returns a [[ListingFileCatalog]] for this table restricted to the subset of partitions + * specified by the given partition-pruning filters. + * + * @param filters partition-pruning filters + */ + def filterPartitions(filters: Seq[Expression]): ListingFileCatalog = { --- End diff -- I see your point about avoiding the overhead of pruning in planning. The flip side of that is surfacing information valuable to both the planner and the user in the query plan itself. This is why our (VideoAmp's) clone of Spark does partition pruning in the optimization phase (as it was in the early days of Spark SQL, I think Spark pre-1.3). We've found that seeing the actual partitions that will be read in the query plan (in "explain extended", for example) has been invaluable in validating that a query is going to read the files we think it's going to read instead of the entire freakin' table. To be more specific, this is particularly helpful in validating correct pruning behavior in left and right outer joins. Sure, an expert will understand which sides' partition pruning filters need to go in the "join" clause and which in the "where" clause. This is not the case with less experienced users and even experienced users can forget which goes where and when. In practice, performing partition pruning in query planning has not presented a problem for us. The operation is either performed in planning or in execution. Pe
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r77426636 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala --- @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.hadoop.fs.Path + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types.{StructField, StructType} + + +/** + * A [[BasicFileCatalog]] for a metastore catalog table. + * + * @param sparkSession a [[SparkSession]] + * @param db the table's database name + * @param table the table's (unqualified) name + * @param partitionSchema the schema of a partitioned table's partition columns + * @param sizeInBytes the table's data size in bytes + */ +class TableFileCatalog( +sparkSession: SparkSession, +db: String, +table: String, +partitionSchema: Option[StructType], +override val sizeInBytes: Long) + extends SessionFileCatalog(sparkSession) { + + override protected val hadoopConf = sparkSession.sessionState.newHadoopConf + + private val externalCatalog = sparkSession.sharedState.externalCatalog --- End diff -- Maybe it's ok for now, but I think it makes the dependencies more clear, otherwise it's not obvious this class basically delegates most of its work to `ExternalCatalog`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r77426535 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala --- @@ -79,8 +79,16 @@ object FileSourceStrategy extends Strategy with Logging { ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet))) logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}") + val prunedFsRelation = fsRelation.location match { --- End diff -- That's correct. Let's wait for #14750 then. My feeling here is that schema inference over a subset of files is not acceptable since then the schema can vary with the partition predicate. For the broadcast join optimization, I think that is a larger issue that should not affect the design of this pr. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r77425878 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala --- @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.hadoop.fs.Path + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types.{StructField, StructType} + + +/** + * A [[BasicFileCatalog]] for a metastore catalog table. + * + * @param sparkSession a [[SparkSession]] + * @param db the table's database name + * @param table the table's (unqualified) name + * @param partitionSchema the schema of a partitioned table's partition columns + * @param sizeInBytes the table's data size in bytes + */ +class TableFileCatalog( +sparkSession: SparkSession, +db: String, +table: String, +partitionSchema: Option[StructType], +override val sizeInBytes: Long) + extends SessionFileCatalog(sparkSession) { + + override protected val hadoopConf = sparkSession.sessionState.newHadoopConf + + private val externalCatalog = sparkSession.sharedState.externalCatalog --- End diff -- Yes, but why would we want to pass something other than sparkSession.sharedState.externalCatalog ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r77425776 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala --- @@ -79,8 +79,16 @@ object FileSourceStrategy extends Strategy with Logging { ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet))) logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}") + val prunedFsRelation = fsRelation.location match { --- End diff -- It appears `FileSourceScanExec` still does that. It's just redundant in this case. If I understand correctly, I think that what you're asking was how things worked in my first commit. In my second commit, I put this code (and the `TableFileCatalog.filterPartitions` method) into the planning stage for two reasons. First, if we need to do a file schema discovery and reconciliation against the metastore schema, we don't want to do that over the entire table. In my first commit, I avoided this operation by removing it completely. However, as a consequence it broke compatibility with parquet files with mixed case column names. As a fix, I planned to defer this schema reconciliation to physical planning, after the partition pruning. In the analysis phase, the analyzer would still use the metastore schema. This would be a compromise solution wherein only the pruned partitions' file schema would be scanned. However, I believe the work to retrieve and use the schema stored in the hive table properties will make this file scanning entirely unnecessary. That remains to be seen. I'm waiting for #14750 to be merged into master first. Second, I've long been frustrated by the inability of the planner to use the data size of a partition-pruned relation in deciding whether to apply an automatic broadcast join. I just realized that this commit won't help here, since that decision is made with the logical plan before it gets here. I could explore pruning in the optimizer instead. In fact, that's how we've done it on an internal spark clone. It helps seeing the pruning done in the optimized plan. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r77424735 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -184,7 +184,7 @@ case class FileSourceScanExec( "Batched" -> supportsBatch.toString, "PartitionFilters" -> partitionFilters.mkString("[", ", ", "]"), "PushedFilters" -> dataFilters.mkString("[", ", ", "]"), -"InputPaths" -> relation.location.paths.mkString(", ")) +"RootPaths" -> relation.location.rootPaths.mkString(", ")) --- End diff -- Yeah, I think mainly you want to be able to differentiate between a metastore-backed and pure file-catalog relation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r77423958 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -184,7 +184,7 @@ case class FileSourceScanExec( "Batched" -> supportsBatch.toString, "PartitionFilters" -> partitionFilters.mkString("[", ", ", "]"), "PushedFilters" -> dataFilters.mkString("[", ", ", "]"), -"InputPaths" -> relation.location.paths.mkString(", ")) +"RootPaths" -> relation.location.rootPaths.mkString(", ")) --- End diff -- I'm not quite sure what you're asking for here. If I replace `"RootPaths"` with an entry "Location" -> relation.location and implement appropriate `toString` methods for the `relation.location` classes, does that satisfy your concern? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r77422471 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala --- @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.hadoop.fs.Path + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types.{StructField, StructType} + + +/** + * A [[BasicFileCatalog]] for a metastore catalog table. + * + * @param sparkSession a [[SparkSession]] + * @param db the table's database name + * @param table the table's (unqualified) name + * @param partitionSchema the schema of a partitioned table's partition columns + * @param sizeInBytes the table's data size in bytes + */ +class TableFileCatalog( +sparkSession: SparkSession, +db: String, +table: String, +partitionSchema: Option[StructType], +override val sizeInBytes: Long) + extends SessionFileCatalog(sparkSession) { + + override protected val hadoopConf = sparkSession.sessionState.newHadoopConf + + private val externalCatalog = sparkSession.sharedState.externalCatalog + + private val catalogTable = externalCatalog.getTable(db, table) + + private val baseLocation = catalogTable.storage.locationUri + + override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq + + override def listFiles(filters: Seq[Expression]): Seq[Partition] = partitionSchema match { +case Some(partitionSchema) => + externalCatalog.listPartitionsByFilter(db, table, filters).flatMap { +case CatalogTablePartition(spec, storage, _) => + storage.locationUri.map(new Path(_)).map { path => +val files = listDataLeafFiles(path :: Nil).toSeq +val values = + InternalRow.fromSeq(partitionSchema.map { case StructField(name, dataType, _, _) => +Cast(Literal(spec(name)), dataType).eval() + }) +Partition(values, files) + } + } +case None => + Partition(InternalRow.empty, listDataLeafFiles(rootPaths).toSeq) :: Nil + } + + override def refresh(): Unit = {} + + + /** + * Returns a [[ListingFileCatalog]] for this table restricted to the subset of partitions + * specified by the given partition-pruning filters. + * + * @param filters partition-pruning filters + */ + def filterPartitions(filters: Seq[Expression]): ListingFileCatalog = { --- End diff -- Usually the partitioned table is big (fact table), mostly broadcast join will not be picked even having the pruned statistics. btw, we have broadcast hint, it's fine to move the pruning into execution time, same as others. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r77419498 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala --- @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.hadoop.fs.Path + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types.{StructField, StructType} + + +/** + * A [[BasicFileCatalog]] for a metastore catalog table. + * + * @param sparkSession a [[SparkSession]] + * @param db the table's database name + * @param table the table's (unqualified) name + * @param partitionSchema the schema of a partitioned table's partition columns + * @param sizeInBytes the table's data size in bytes + */ +class TableFileCatalog( +sparkSession: SparkSession, +db: String, +table: String, +partitionSchema: Option[StructType], +override val sizeInBytes: Long) + extends SessionFileCatalog(sparkSession) { + + override protected val hadoopConf = sparkSession.sessionState.newHadoopConf + + private val externalCatalog = sparkSession.sharedState.externalCatalog + + private val catalogTable = externalCatalog.getTable(db, table) + + private val baseLocation = catalogTable.storage.locationUri + + override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq + + override def listFiles(filters: Seq[Expression]): Seq[Partition] = partitionSchema match { +case Some(partitionSchema) => + externalCatalog.listPartitionsByFilter(db, table, filters).flatMap { +case CatalogTablePartition(spec, storage, _) => + storage.locationUri.map(new Path(_)).map { path => +val files = listDataLeafFiles(path :: Nil).toSeq +val values = + InternalRow.fromSeq(partitionSchema.map { case StructField(name, dataType, _, _) => +Cast(Literal(spec(name)), dataType).eval() + }) +Partition(values, files) + } + } +case None => + Partition(InternalRow.empty, listDataLeafFiles(rootPaths).toSeq) :: Nil + } + + override def refresh(): Unit = {} + + + /** + * Returns a [[ListingFileCatalog]] for this table restricted to the subset of partitions + * specified by the given partition-pruning filters. + * + * @param filters partition-pruning filters + */ + def filterPartitions(filters: Seq[Expression]): ListingFileCatalog = { --- End diff -- It seems a little weird to have catalogs that refer to a pruned table. We should try to do this at execution time instead, so that planning does not block behind pruning. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r77419464 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala --- @@ -79,8 +79,16 @@ object FileSourceStrategy extends Strategy with Logging { ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet))) logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}") + val prunedFsRelation = fsRelation.location match { --- End diff -- Can we push this pruning into the scan (i.e. do it when computing `inputRDD` in `FileSourceScanExec`)? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r77419510 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala --- @@ -346,11 +340,30 @@ trait FileCatalog { */ def listFiles(filters: Seq[Expression]): Seq[Partition] + /** Refresh any cached file listings */ + def refresh(): Unit + + /** Sum of table file sizes, in bytes */ + def sizeInBytes: Long +} + +/** + * A [[BasicFileCatalog]] which can enumerate all of the files comprising a relation and, from + * those, infer the relation's partition specification. + */ +trait FileCatalog extends BasicFileCatalog { --- End diff -- What's the motivation behind splitting FileCatalog and BasicFileCatalog? Is it to prevent accidental calls to allFiles()? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r77419448 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -184,7 +184,7 @@ case class FileSourceScanExec( "Batched" -> supportsBatch.toString, "PartitionFilters" -> partitionFilters.mkString("[", ", ", "]"), "PushedFilters" -> dataFilters.mkString("[", ", ", "]"), -"InputPaths" -> relation.location.paths.mkString(", ")) +"RootPaths" -> relation.location.rootPaths.mkString(", ")) --- End diff -- Btw, it would be nice to make sure the physical plan still has a good debug string when you call explain (i.e. tells which catalog it's using) since that will greatly impact performance in this case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r77419496 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala --- @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.hadoop.fs.Path + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types.{StructField, StructType} + + +/** + * A [[BasicFileCatalog]] for a metastore catalog table. + * + * @param sparkSession a [[SparkSession]] + * @param db the table's database name + * @param table the table's (unqualified) name + * @param partitionSchema the schema of a partitioned table's partition columns + * @param sizeInBytes the table's data size in bytes + */ +class TableFileCatalog( +sparkSession: SparkSession, +db: String, +table: String, +partitionSchema: Option[StructType], +override val sizeInBytes: Long) + extends SessionFileCatalog(sparkSession) { + + override protected val hadoopConf = sparkSession.sessionState.newHadoopConf + + private val externalCatalog = sparkSession.sharedState.externalCatalog --- End diff -- Can we make this an explicit constructor parameter? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/14690#discussion_r77419455 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -2531,6 +2531,8 @@ class Dataset[T] private[sql]( */ def inputFiles: Array[String] = { val files: Seq[String] = logicalPlan.collect { + case LogicalRelation(HadoopFsRelation(_, location: FileCatalog, _, _, _, _, _), _, _) => --- End diff -- Hm, should we still have HadoopFsRelation implement FileRelation? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14690: [SPARK-16980][SQL] Load only catalog table partit...
GitHub user mallman opened a pull request: https://github.com/apache/spark/pull/14690 [SPARK-16980][SQL] Load only catalog table partition metadata required to answer a query (This PR addresses https://issues.apache.org/jira/browse/SPARK-16980.) (N.B. I'm submitting this PR as an enhanced version of an internal POC I wrote. I'm looking for preliminary feedback on what I have so far and to discuss some design and implementation issues. This PR is not currently a candidate for merging into master.) (N.B. This PR is known to fail several unit tests related to Hive/Parquet conversion. Obviously, these failures will be addressed before this PR is submitted for merging into master.) ## What changes were proposed in this pull request? In a new Spark application, when a partitioned Hive table is converted to use Spark's `HadoopFsRelation` in `HiveMetastoreCatalog`, metadata for every partition of that table are retrieved from the metastore and loaded into driver memory. In addition, every partition's metadata files are read from the filesystem to perform schema inference. If a user queries such a table with predicates which prune that table's partitions, we would like to be able to answer that query without consulting partition metadata which are not involved in the query. When querying a table with a large number of partitions for some data from a small number of partitions (maybe even a single partition), the current conversion strategy is highly inefficient. I suspect this scenario is not uncommon in the wild. In addition to being inefficient in running time, the current strategy is inefficient in its use of driver memory. When the sum of the number of partitions of all tables loaded in a driver reaches a certain level (somewhere in the tens of thousands), their cached data exhaust all driver heap memory in the default configuration. I suspect this scenario is less common (in that not too many deployments work with tables with tens of thousands of partitions), however this does illustrate how large the memory footprint of this metadata can be. With tables with hundreds or thousands of partitions, I would expect the `HiveMetastoreCatalog` table cache to represent a significant portion of the driver's heap space. This PR proposes an alternative approach. Basically, it makes three changes: 1. It adds a new method, `listPartitionsByFilter` to the Catalyst `ExternalCatalog` trait which returns the partition metadata for a given sequence of partition pruning predicates. 1. It refactors the `FileCatalog` type hierarchy to include a new `TableFileCatalog` to efficiently return files only for partitions matching a sequence of partition pruning predicates. 1. It removes partition loading and caching from `HiveMetastoreCatalog`. The net effect is that when a query over a partitioned Hive table is planned, the analyzer retrieves the table metadata from `HiveMetastoreCatalog`. As part of this operation, the `HiveMetastoreCatalog` builds a `HadoopFsRelation` with a `TableFileCatalog`. It does not load any partition metadata or scan any files. The physical planner identifies the data files the query needs by asking the relation's `TableFileCatalog` for the files matching any predicate pruning predicates. The `TableFileCatalog` in turn calls the `listPartitionsByFilter` method on its external catalog. This queries the Hive metastore, passing along those filters. ## Open Issues 1. This PR omits partition metadata caching. I'm not sure if this is even needed if we're only loading partition metadata for a given query. However, it may not be that tricky to implement this effectively. 1. This PR removes and omits partitioned Hive table schema reconciliation. As a result, it fails to find Parquet schema columns with upper case letters because of the Hive metastore's case-insensitivity. I think this is the most significant hurdle for this PR. It just occurred to me that we might be able to do just-in-time schema reconciliation using the partitions that are used in a query. I haven't tried this, but I would attempt this by adding a method to `HadoopFsRelation` or `BasicFileCatalog` which returns a SQL schema for a given sequence of partition pruning predicates (or partitions). I'll give this a try and report back. Another idea would be to use the current strategy of merging schema from all table files unless the user sets a boolean SQL configuration parameter like `spark.sql.assumeLowerCaseColumnNames`. If the user's tables have only lower-case column names, then it's safe to use this PR's optimizations. I don't think this is an entirely unrealistic scenario as w e have enforced all lower-case column names from the beginning because of case-sensitivity issues. Maybe we're not the only ones? 1. This PR omits an implementation of `listPartitionsByFilter` for