[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions URL: https://github.com/apache/spark/pull/26805#discussion_r376844512 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.hadoop.hive.common.StatsSetupConst + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.CastSupport +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, ExternalCatalogUtils, HiveTableRelation} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, Expression, ExpressionSet, SubqueryExpression} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.internal.SQLConf + +/** + * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. Review comment: classdoc is good enough This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions URL: https://github.com/apache/spark/pull/26805#discussion_r368849527 ## File path: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala ## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.test.SQLTestUtils + +class PruneHiveTablePartitionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { + + object Optimize extends RuleExecutor[LogicalPlan] { +val batches = + Batch("PruneHiveTablePartitions", Once, +EliminateSubqueryAliases, new PruneHiveTablePartitions(spark)) :: Nil + } + + test("SPARK-15616 statistics pruned after going throuhg PruneHiveTablePartitions") { +withTable("test", "temp") { + sql( +s""" + |CREATE TABLE test(i int) + |PARTITIONED BY (p int) + |STORED AS textfile""".stripMargin) + spark.range(0, 1000, 1).selectExpr("id as col") +.createOrReplaceTempView("temp") + + for (part <- Seq(1, 2, 3, 4)) { +sql(s""" + |INSERT OVERWRITE TABLE test PARTITION (p='$part') + |select col from temp""".stripMargin) + } + val analyzed1 = sql("select i from test where p>0").queryExecution.analyzed Review comment: nit: `p > 0` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions URL: https://github.com/apache/spark/pull/26805#discussion_r368849212 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.hadoop.hive.common.StatsSetupConst + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.CastSupport +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, ExternalCatalogUtils, HiveTableRelation} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, Expression, ExpressionSet, SubqueryExpression} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.internal.SQLConf + +/** + * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. + */ +private[sql] class PruneHiveTablePartitions(session: SparkSession) + extends Rule[LogicalPlan] with CastSupport { + + override val conf: SQLConf = session.sessionState.conf + + /** + * Extract the partition filters from the filters on the table. + */ + private def getPartitionKeyFilters( + filters: Seq[Expression], + relation: HiveTableRelation): ExpressionSet = { +val normalizedFilters = DataSourceStrategy.normalizeExprs( + filters.filter(f => f.deterministic && !SubqueryExpression.hasSubquery(f)), relation.output) +val partitionColumnSet = AttributeSet(relation.partitionCols) +ExpressionSet(normalizedFilters.filter { f => + !f.references.isEmpty && f.references.subsetOf(partitionColumnSet) +}) + } + + /** + * Prune the hive table using filters on the partitions of the table. + */ + private def prunePartitions( + relation: HiveTableRelation, + partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = { +if (conf.metastorePartitionPruning) { + session.sessionState.catalog.listPartitionsByFilter( +relation.tableMeta.identifier, partitionFilters.toSeq) +} else { + ExternalCatalogUtils.prunePartitionsByFilter(relation.tableMeta, + session.sessionState.catalog.listPartitions(relation.tableMeta.identifier), +partitionFilters.toSeq, conf.sessionLocalTimeZone) +} + } + + /** + * Update the statistics of the table. + */ + private def updateTableMeta( + tableMeta: CatalogTable, + prunedPartitions: Seq[CatalogTablePartition]): CatalogTable = { +val sizeOfPartitions = prunedPartitions.map { partition => + val rawDataSize = partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong) + val totalSize = partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong) + if (rawDataSize.isDefined && rawDataSize.get > 0) { +rawDataSize.get + } else if (totalSize.isDefined && totalSize.get > 0L) { +totalSize.get + } else { +0L + } +} +if (sizeOfPartitions.forall(s => s>0)) { Review comment: nit `forall(_ > 0)` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions URL: https://github.com/apache/spark/pull/26805#discussion_r368849571 ## File path: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala ## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.test.SQLTestUtils + +class PruneHiveTablePartitionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { + + object Optimize extends RuleExecutor[LogicalPlan] { +val batches = + Batch("PruneHiveTablePartitions", Once, +EliminateSubqueryAliases, new PruneHiveTablePartitions(spark)) :: Nil + } + + test("SPARK-15616 statistics pruned after going throuhg PruneHiveTablePartitions") { +withTable("test", "temp") { + sql( +s""" + |CREATE TABLE test(i int) + |PARTITIONED BY (p int) + |STORED AS textfile""".stripMargin) + spark.range(0, 1000, 1).selectExpr("id as col") +.createOrReplaceTempView("temp") + + for (part <- Seq(1, 2, 3, 4)) { +sql(s""" + |INSERT OVERWRITE TABLE test PARTITION (p='$part') + |select col from temp""".stripMargin) + } + val analyzed1 = sql("select i from test where p>0").queryExecution.analyzed + val analyzed2 = sql("select i from test where p=1").queryExecution.analyzed Review comment: ditto This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions URL: https://github.com/apache/spark/pull/26805#discussion_r368849466 ## File path: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala ## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.test.SQLTestUtils + +class PruneHiveTablePartitionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { + + object Optimize extends RuleExecutor[LogicalPlan] { +val batches = + Batch("PruneHiveTablePartitions", Once, +EliminateSubqueryAliases, new PruneHiveTablePartitions(spark)) :: Nil + } + + test("SPARK-15616 statistics pruned after going throuhg PruneHiveTablePartitions") { +withTable("test", "temp") { + sql( +s""" + |CREATE TABLE test(i int) + |PARTITIONED BY (p int) + |STORED AS textfile""".stripMargin) + spark.range(0, 1000, 1).selectExpr("id as col") +.createOrReplaceTempView("temp") + + for (part <- Seq(1, 2, 3, 4)) { +sql(s""" + |INSERT OVERWRITE TABLE test PARTITION (p='$part') Review comment: please keep the style of multiline string consistent. https://github.com/apache/spark/pull/26805/files#diff-90836f7778a704901d7d3df02846aa55R39 is corrected. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions URL: https://github.com/apache/spark/pull/26805#discussion_r368849637 ## File path: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala ## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.test.SQLTestUtils + +class PruneHiveTablePartitionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { + + object Optimize extends RuleExecutor[LogicalPlan] { +val batches = + Batch("PruneHiveTablePartitions", Once, +EliminateSubqueryAliases, new PruneHiveTablePartitions(spark)) :: Nil + } + + test("SPARK-15616 statistics pruned after going throuhg PruneHiveTablePartitions") { +withTable("test", "temp") { + sql( +s""" + |CREATE TABLE test(i int) + |PARTITIONED BY (p int) + |STORED AS textfile""".stripMargin) + spark.range(0, 1000, 1).selectExpr("id as col") +.createOrReplaceTempView("temp") + + for (part <- Seq(1, 2, 3, 4)) { +sql(s""" + |INSERT OVERWRITE TABLE test PARTITION (p='$part') + |select col from temp""".stripMargin) + } + val analyzed1 = sql("select i from test where p>0").queryExecution.analyzed + val analyzed2 = sql("select i from test where p=1").queryExecution.analyzed + assert(Optimize.execute(analyzed1).stats.sizeInBytes/4 === Review comment: ditto This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions URL: https://github.com/apache/spark/pull/26805#discussion_r368420662 ## File path: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala ## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.test.SQLTestUtils + +class PruneHiveTablePartitionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { + + object Optimize extends RuleExecutor[LogicalPlan] { +val batches = + Batch("PruneHiveTablePartitions", Once, +EliminateSubqueryAliases, new PruneHiveTablePartitions(spark)) :: Nil + } + + test("SPARK-15616 statistics pruned after going throuhg PruneHiveTablePartitions") { +withTable("test", "temp") { + withTempDir { dir => +sql( + s""" + |CREATE EXTERNAL TABLE test(i int) + |PARTITIONED BY (p int) + |STORED AS textfile + |LOCATION '${dir.toURI}'""".stripMargin) + +spark.range(0, 1000, 1).selectExpr("id as col") + .createOrReplaceTempView("temp") + +for (part <- Seq(1, 2, 3, 4)) { + sql(s""" + |INSERT OVERWRITE TABLE test PARTITION (p='$part') + |select col from temp""".stripMargin) +} +val singlePartitionSizeInBytes = 3890 +val analyzed1 = sql("select i from test where p>0").queryExecution.analyzed +val analyzed2 = sql("select i from test where p=1").queryExecution.analyzed +assert(Optimize.execute(analyzed1).stats.sizeInBytes === singlePartitionSizeInBytes*4*12/16) Review comment: checking the accurate file size can be flaky. Can we just check that the first size is more than 3 times larger than the second size? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions URL: https://github.com/apache/spark/pull/26805#discussion_r368420068 ## File path: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala ## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.test.SQLTestUtils + +class PruneHiveTablePartitionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { + + object Optimize extends RuleExecutor[LogicalPlan] { +val batches = + Batch("PruneHiveTablePartitions", Once, +EliminateSubqueryAliases, new PruneHiveTablePartitions(spark)) :: Nil + } + + test("SPARK-15616 statistics pruned after going throuhg PruneHiveTablePartitions") { +withTable("test", "temp") { + withTempDir { dir => +sql( + s""" + |CREATE EXTERNAL TABLE test(i int) Review comment: does it have to be an external table? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions URL: https://github.com/apache/spark/pull/26805#discussion_r368419917 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.hadoop.hive.common.StatsSetupConst + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.CastSupport +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, ExternalCatalogUtils, HiveTableRelation} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, Expression, ExpressionSet, SubqueryExpression} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.internal.SQLConf + +/** + * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. + */ +private[sql] class PruneHiveTablePartitions(session: SparkSession) + extends Rule[LogicalPlan] with CastSupport { + + override val conf: SQLConf = session.sessionState.conf + + /** + * Extract the partition filters from the filters on the table. + */ + private def getPartitionKeyFilters( + filters: Seq[Expression], + relation: HiveTableRelation): ExpressionSet = { +val normalizedFilters = DataSourceStrategy.normalizeExprs( + filters.filter(f => f.deterministic && !SubqueryExpression.hasSubquery(f)), relation.output) +val partitionColumnSet = AttributeSet(relation.partitionCols) +ExpressionSet(normalizedFilters.filter { f => + !f.references.isEmpty && f.references.subsetOf(partitionColumnSet) +}) + } + + /** + * Prune the hive table using filters on the partitions of the table. + */ + private def prunePartitions( + relation: HiveTableRelation, + partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = { +if (conf.metastorePartitionPruning) { + session.sessionState.catalog.listPartitionsByFilter( +relation.tableMeta.identifier, partitionFilters.toSeq) +} else { + ExternalCatalogUtils.prunePartitionsByFilter(relation.tableMeta, + session.sessionState.catalog.listPartitions(relation.tableMeta.identifier), +partitionFilters.toSeq, conf.sessionLocalTimeZone) +} + } + + /** + * Update the statistics of the table. + */ + private def updateTableMeta( + tableMeta: CatalogTable, + prunedPartitions: Seq[CatalogTablePartition]): CatalogTable = { +val sizeOfPartitions = prunedPartitions.map { partition => + val rawDataSize = partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong) + val totalSize = partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong) + if (rawDataSize.isDefined && rawDataSize.get > 0) { +rawDataSize.get + } else if (totalSize.isDefined && totalSize.get > 0L) { +totalSize.get + } else { +0L + } +} +if (sizeOfPartitions.forall(s => s>0)) { + val sizeInBytes = sizeOfPartitions.sum + tableMeta.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes +} else { + tableMeta +} + } + + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { +case op @ PhysicalOperation(projections, filters, relation: HiveTableRelation) + if filters.nonEmpty && relation.isPartitioned && relation.prunedPartitions.isEmpty => + val partitionKeyFilters = getPartitionKeyFilters(filters, relation) + if (partitionKeyFilters.nonEmpty) { +val newPartitions = prunePartitions(relation, partitionKeyFilters) +val newTableMeta = updateTableMeta(relation.tableMeta, newPartitions) +val newRelation = relation.copy( + tableMeta = newTableMeta, prunedPartitions = Some(newPartitions)) +Pr
[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions URL: https://github.com/apache/spark/pull/26805#discussion_r368390534 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.hadoop.hive.common.StatsSetupConst + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.CastSupport +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, ExternalCatalogUtils, HiveTableRelation} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, Expression, ExpressionSet, SubqueryExpression} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.internal.SQLConf + +/** + * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. + */ +private[sql] class PruneHiveTablePartitions(session: SparkSession) + extends Rule[LogicalPlan] with CastSupport { + + override val conf: SQLConf = session.sessionState.conf + + /** + * Extract the partition filters from the filters on the table. + */ + private def getPartitionKeyFilters( + filters: Seq[Expression], + relation: HiveTableRelation): ExpressionSet = { +val normalizedFilters = DataSourceStrategy.normalizeExprs( + filters.filter(f => f.deterministic && !SubqueryExpression.hasSubquery(f)), relation.output) +val partitionColumnSet = AttributeSet(relation.partitionCols) +ExpressionSet(normalizedFilters.filter { f => + !f.references.isEmpty && f.references.subsetOf(partitionColumnSet) +}) + } + + /** + * Prune the hive table using filters on the partitions of the table. + */ + private def prunePartitions( + relation: HiveTableRelation, + partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = { +if (conf.metastorePartitionPruning) { + session.sessionState.catalog.listPartitionsByFilter( +relation.tableMeta.identifier, partitionFilters.toSeq) +} else { + ExternalCatalogUtils.prunePartitionsByFilter(relation.tableMeta, + session.sessionState.catalog.listPartitions(relation.tableMeta.identifier), +partitionFilters.toSeq, conf.sessionLocalTimeZone) +} + } + + /** + * Update the statistics of the table. + */ + private def updateTableMeta( + tableMeta: CatalogTable, + prunedPartitions: Seq[CatalogTablePartition]): CatalogTable = { +val sizeOfPartitions = prunedPartitions.map { partition => + val rawDataSize = partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong) + val totalSize = partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong) + if (rawDataSize.isDefined && rawDataSize.get > 0) { +rawDataSize.get + } else if (totalSize.isDefined && totalSize.get > 0L) { +totalSize.get + } else { +0L + } +} +if (sizeOfPartitions.forall(s => s>0)) { + val sizeInBytes = sizeOfPartitions.sum + tableMeta.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes +} else { + tableMeta +} + } + + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { +case op @ PhysicalOperation(projections, filters, relation: HiveTableRelation) + if filters.nonEmpty && relation.isPartitioned && relation.prunedPartitions.isEmpty => + val partitionKeyFilters = getPartitionKeyFilters(filters, relation) + if (partitionKeyFilters.nonEmpty) { +val newPartitions = prunePartitions(relation, partitionKeyFilters) +val newTableMeta = updateTableMeta(relation.tableMeta, newPartitions) +val newRelation = relation.copy( + tableMeta = newTableMeta, + pushedDownPartitionFilters = Some(partit
[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions URL: https://github.com/apache/spark/pull/26805#discussion_r368389539 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.hadoop.hive.common.StatsSetupConst + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.CastSupport +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, ExternalCatalogUtils, HiveTableRelation} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, Expression, ExpressionSet, SubqueryExpression} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.internal.SQLConf + +/** + * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. + */ +private[sql] class PruneHiveTablePartitions(session: SparkSession) + extends Rule[LogicalPlan] with CastSupport { + + override val conf: SQLConf = session.sessionState.conf + + /** + * Extract the partition filters from the filters on the table. + */ + private def getPartitionKeyFilters( + filters: Seq[Expression], + relation: HiveTableRelation): ExpressionSet = { +val normalizedFilters = DataSourceStrategy.normalizeExprs( + filters.filter(f => f.deterministic && !SubqueryExpression.hasSubquery(f)), relation.output) +val partitionColumnSet = AttributeSet(relation.partitionCols) +ExpressionSet(normalizedFilters.filter { f => + !f.references.isEmpty && f.references.subsetOf(partitionColumnSet) +}) + } + + /** + * Prune the hive table using filters on the partitions of the table. + */ + private def prunePartitions( + relation: HiveTableRelation, + partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = { +if (conf.metastorePartitionPruning) { + session.sessionState.catalog.listPartitionsByFilter( +relation.tableMeta.identifier, partitionFilters.toSeq) +} else { + ExternalCatalogUtils.prunePartitionsByFilter(relation.tableMeta, + session.sessionState.catalog.listPartitions(relation.tableMeta.identifier), +partitionFilters.toSeq, conf.sessionLocalTimeZone) +} + } + + /** + * Update the statistics of the table. + */ + private def updateTableMeta( + tableMeta: CatalogTable, + prunedPartitions: Seq[CatalogTablePartition]): CatalogTable = { +val sizeOfPartitions = prunedPartitions.map { partition => + val rawDataSize = partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong) + val totalSize = partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong) + if (rawDataSize.isDefined && rawDataSize.get > 0) { +rawDataSize.get + } else if (totalSize.isDefined && totalSize.get > 0L) { +totalSize.get + } else { +0L + } +} +if (sizeOfPartitions.forall(s => s>0)) { + val sizeInBytes = sizeOfPartitions.sum + tableMeta.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes +} else { + tableMeta +} + } + + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { +case op @ PhysicalOperation(projections, filters, relation: HiveTableRelation) + if filters.nonEmpty && relation.isPartitioned && relation.prunedPartitions.isEmpty => + val partitionKeyFilters = getPartitionKeyFilters(filters, relation) + if (partitionKeyFilters.nonEmpty) { +val newPartitions = prunePartitions(relation, partitionKeyFilters) +val newTableMeta = updateTableMeta(relation.tableMeta, newPartitions) +val newRelation = relation.copy( + tableMeta = newTableMeta, prunedPartitions = Some(newPartitions)) +Pr
[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions URL: https://github.com/apache/spark/pull/26805#discussion_r368037093 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.hadoop.hive.common.StatsSetupConst + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.CastSupport +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, ExternalCatalogUtils, HiveTableRelation} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, Expression, ExpressionSet, SubqueryExpression} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.internal.SQLConf + +/** + * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. + */ +private[sql] class PruneHiveTablePartitions(session: SparkSession) + extends Rule[LogicalPlan] with CastSupport { + + override val conf: SQLConf = session.sessionState.conf + + /** + * Extract the partition filters from the filters on the table. + */ + private def getPartitionKeyFilters( + filters: Seq[Expression], + relation: HiveTableRelation): ExpressionSet = { +val normalizedFilters = DataSourceStrategy.normalizeExprs( + filters.filter(f => f.deterministic && !SubqueryExpression.hasSubquery(f)), relation.output) +val partitionColumnSet = AttributeSet(relation.partitionCols) +ExpressionSet(normalizedFilters.filter { f => + !f.references.isEmpty && f.references.subsetOf(partitionColumnSet) +}) + } + + /** + * Prune the hive table using filters on the partitions of the table. + */ + private def prunePartitions( + relation: HiveTableRelation, + partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = { +if (conf.metastorePartitionPruning) { + session.sessionState.catalog.listPartitionsByFilter( +relation.tableMeta.identifier, partitionFilters.toSeq) +} else { + ExternalCatalogUtils.prunePartitionsByFilter(relation.tableMeta, + session.sessionState.catalog.listPartitions(relation.tableMeta.identifier), +partitionFilters.toSeq, conf.sessionLocalTimeZone) +} + } + + /** + * Update the statistics of the table. + */ + private def updateTableMeta( + tableMeta: CatalogTable, + prunedPartitions: Seq[CatalogTablePartition]): CatalogTable = { +val sizeOfPartitions = prunedPartitions.map { partition => + val rawDataSize = partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong) + val totalSize = partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong) + if (rawDataSize.isDefined && rawDataSize.get > 0) { +rawDataSize.get + } else if (totalSize.isDefined && totalSize.get > 0L) { +totalSize.get + } else { +0L + } +} +if (sizeOfPartitions.forall(s => s>0)) { + val sizeInBytes = sizeOfPartitions.sum + tableMeta.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes +} else { + tableMeta +} + } + + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { +case op @ PhysicalOperation(projections, filters, relation: HiveTableRelation) + if filters.nonEmpty && relation.isPartitioned && relation.prunedPartitions.isEmpty => + val partitionKeyFilters = getPartitionKeyFilters(filters, relation) + if (partitionKeyFilters.nonEmpty) { +val newPartitions = prunePartitions(relation, partitionKeyFilters) +val newTableMeta = updateTableMeta(relation.tableMeta, newPartitions) +val newRelation = relation.copy( + tableMeta = newTableMeta, prunedPartitions = Some(newPartitions)) +Pr
[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions URL: https://github.com/apache/spark/pull/26805#discussion_r367909947 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.hadoop.hive.common.StatsSetupConst + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.CastSupport +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, ExternalCatalogUtils, HiveTableRelation} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, Expression, ExpressionSet, SubqueryExpression} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.internal.SQLConf + +/** + * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. + */ +private[sql] class PruneHiveTablePartitions(session: SparkSession) + extends Rule[LogicalPlan] with CastSupport { + + override val conf: SQLConf = session.sessionState.conf + + /** + * Extract the partition filters from the filters on the table. + */ + private def getPartitionKeyFilters( + filters: Seq[Expression], + relation: HiveTableRelation): ExpressionSet = { +val normalizedFilters = DataSourceStrategy.normalizeExprs( + filters.filter(f => f.deterministic && !SubqueryExpression.hasSubquery(f)), relation.output) +val partitionColumnSet = AttributeSet(relation.partitionCols) +ExpressionSet(normalizedFilters.filter { f => + !f.references.isEmpty && f.references.subsetOf(partitionColumnSet) +}) + } + + /** + * Prune the hive table using filters on the partitions of the table. + */ + private def prunePartitions( + relation: HiveTableRelation, + partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = { +if (conf.metastorePartitionPruning) { + session.sessionState.catalog.listPartitionsByFilter( +relation.tableMeta.identifier, partitionFilters.toSeq) +} else { + ExternalCatalogUtils.prunePartitionsByFilter(relation.tableMeta, Review comment: yes This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions URL: https://github.com/apache/spark/pull/26805#discussion_r367909881 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.hadoop.hive.common.StatsSetupConst + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.CastSupport +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, ExternalCatalogUtils, HiveTableRelation} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, Expression, ExpressionSet, SubqueryExpression} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.internal.SQLConf + +/** + * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. + */ +private[sql] class PruneHiveTablePartitions(session: SparkSession) + extends Rule[LogicalPlan] with CastSupport { + + override val conf: SQLConf = session.sessionState.conf + + /** + * Extract the partition filters from the filters on the table. + */ + private def getPartitionKeyFilters( + filters: Seq[Expression], + relation: HiveTableRelation): ExpressionSet = { +val normalizedFilters = DataSourceStrategy.normalizeExprs( + filters.filter(f => f.deterministic && !SubqueryExpression.hasSubquery(f)), relation.output) +val partitionColumnSet = AttributeSet(relation.partitionCols) +ExpressionSet(normalizedFilters.filter { f => + !f.references.isEmpty && f.references.subsetOf(partitionColumnSet) +}) + } + + /** + * Prune the hive table using filters on the partitions of the table. + */ + private def prunePartitions( + relation: HiveTableRelation, + partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = { +if (conf.metastorePartitionPruning) { + session.sessionState.catalog.listPartitionsByFilter( +relation.tableMeta.identifier, partitionFilters.toSeq) +} else { + ExternalCatalogUtils.prunePartitionsByFilter(relation.tableMeta, + session.sessionState.catalog.listPartitions(relation.tableMeta.identifier), +partitionFilters.toSeq, conf.sessionLocalTimeZone) +} + } + + /** + * Update the statistics of the table. + */ + private def updateTableMeta( + tableMeta: CatalogTable, + prunedPartitions: Seq[CatalogTablePartition]): CatalogTable = { +val sizeOfPartitions = prunedPartitions.map { partition => + val rawDataSize = partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong) + val totalSize = partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong) + if (rawDataSize.isDefined && rawDataSize.get > 0) { +rawDataSize.get + } else if (totalSize.isDefined && totalSize.get > 0L) { +totalSize.get + } else { +0L + } +} +if (sizeOfPartitions.forall(s => s>0)) { + val sizeInBytes = sizeOfPartitions.sum + tableMeta.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes +} else { + tableMeta +} + } + + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { +case op @ PhysicalOperation(projections, filters, relation: HiveTableRelation) + if filters.nonEmpty && relation.isPartitioned && relation.prunedPartitions.isEmpty => + val partitionKeyFilters = getPartitionKeyFilters(filters, relation) + if (partitionKeyFilters.nonEmpty) { +val newPartitions = prunePartitions(relation, partitionKeyFilters) +val newTableMeta = updateTableMeta(relation.tableMeta, newPartitions) +val newRelation = relation.copy( + tableMeta = newTableMeta, prunedPartitions = Some(newPartitions)) +Pr
[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions URL: https://github.com/apache/spark/pull/26805#discussion_r367826109 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.hadoop.hive.common.StatsSetupConst + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.CastSupport +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, ExternalCatalogUtils, HiveTableRelation} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, Expression, ExpressionSet, SubqueryExpression} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.internal.SQLConf + +/** + * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. + */ +private[sql] class PruneHiveTablePartitions(session: SparkSession) + extends Rule[LogicalPlan] with CastSupport { + + override val conf: SQLConf = session.sessionState.conf + + /** + * Extract the partition filters from the filters on the table. + */ + private def getPartitionKeyFilters( + filters: Seq[Expression], + relation: HiveTableRelation): ExpressionSet = { +val normalizedFilters = DataSourceStrategy.normalizeExprs( + filters.filter(f => f.deterministic && !SubqueryExpression.hasSubquery(f)), relation.output) +val partitionColumnSet = AttributeSet(relation.partitionCols) +ExpressionSet(normalizedFilters.filter { f => + !f.references.isEmpty && f.references.subsetOf(partitionColumnSet) +}) + } + + /** + * Prune the hive table using filters on the partitions of the table. + */ + private def prunePartitions( + relation: HiveTableRelation, + partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = { +if (conf.metastorePartitionPruning) { + session.sessionState.catalog.listPartitionsByFilter( +relation.tableMeta.identifier, partitionFilters.toSeq) +} else { + ExternalCatalogUtils.prunePartitionsByFilter(relation.tableMeta, + session.sessionState.catalog.listPartitions(relation.tableMeta.identifier), +partitionFilters.toSeq, conf.sessionLocalTimeZone) +} + } + + /** + * Update the statistics of the table. + */ + private def updateTableMeta( + tableMeta: CatalogTable, + prunedPartitions: Seq[CatalogTablePartition]): CatalogTable = { +val sizeOfPartitions = prunedPartitions.map { partition => + val rawDataSize = partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong) + val totalSize = partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong) + if (rawDataSize.isDefined && rawDataSize.get > 0) { +rawDataSize.get + } else if (totalSize.isDefined && totalSize.get > 0L) { +totalSize.get + } else { +0L + } +} +if (sizeOfPartitions.forall(s => s>0)) { + val sizeInBytes = sizeOfPartitions.sum + tableMeta.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes +} else { + tableMeta +} + } + + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { +case op @ PhysicalOperation(projections, filters, relation: HiveTableRelation) + if filters.nonEmpty && relation.isPartitioned && relation.prunedPartitions.isEmpty => + val partitionKeyFilters = getPartitionKeyFilters(filters, relation) + if (partitionKeyFilters.nonEmpty) { +val newPartitions = prunePartitions(relation, partitionKeyFilters) +val newTableMeta = updateTableMeta(relation.tableMeta, newPartitions) +val newRelation = relation.copy( + tableMeta = newTableMeta, prunedPartitions = Some(newPartitions)) +Pr
[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions URL: https://github.com/apache/spark/pull/26805#discussion_r367826109 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.hadoop.hive.common.StatsSetupConst + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.CastSupport +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, ExternalCatalogUtils, HiveTableRelation} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, Expression, ExpressionSet, SubqueryExpression} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.internal.SQLConf + +/** + * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. + */ +private[sql] class PruneHiveTablePartitions(session: SparkSession) + extends Rule[LogicalPlan] with CastSupport { + + override val conf: SQLConf = session.sessionState.conf + + /** + * Extract the partition filters from the filters on the table. + */ + private def getPartitionKeyFilters( + filters: Seq[Expression], + relation: HiveTableRelation): ExpressionSet = { +val normalizedFilters = DataSourceStrategy.normalizeExprs( + filters.filter(f => f.deterministic && !SubqueryExpression.hasSubquery(f)), relation.output) +val partitionColumnSet = AttributeSet(relation.partitionCols) +ExpressionSet(normalizedFilters.filter { f => + !f.references.isEmpty && f.references.subsetOf(partitionColumnSet) +}) + } + + /** + * Prune the hive table using filters on the partitions of the table. + */ + private def prunePartitions( + relation: HiveTableRelation, + partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = { +if (conf.metastorePartitionPruning) { + session.sessionState.catalog.listPartitionsByFilter( +relation.tableMeta.identifier, partitionFilters.toSeq) +} else { + ExternalCatalogUtils.prunePartitionsByFilter(relation.tableMeta, + session.sessionState.catalog.listPartitions(relation.tableMeta.identifier), +partitionFilters.toSeq, conf.sessionLocalTimeZone) +} + } + + /** + * Update the statistics of the table. + */ + private def updateTableMeta( + tableMeta: CatalogTable, + prunedPartitions: Seq[CatalogTablePartition]): CatalogTable = { +val sizeOfPartitions = prunedPartitions.map { partition => + val rawDataSize = partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong) + val totalSize = partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong) + if (rawDataSize.isDefined && rawDataSize.get > 0) { +rawDataSize.get + } else if (totalSize.isDefined && totalSize.get > 0L) { +totalSize.get + } else { +0L + } +} +if (sizeOfPartitions.forall(s => s>0)) { + val sizeInBytes = sizeOfPartitions.sum + tableMeta.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes +} else { + tableMeta +} + } + + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { +case op @ PhysicalOperation(projections, filters, relation: HiveTableRelation) + if filters.nonEmpty && relation.isPartitioned && relation.prunedPartitions.isEmpty => + val partitionKeyFilters = getPartitionKeyFilters(filters, relation) + if (partitionKeyFilters.nonEmpty) { +val newPartitions = prunePartitions(relation, partitionKeyFilters) +val newTableMeta = updateTableMeta(relation.tableMeta, newPartitions) +val newRelation = relation.copy( + tableMeta = newTableMeta, prunedPartitions = Some(newPartitions)) +Pr
[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions URL: https://github.com/apache/spark/pull/26805#discussion_r367822954 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.hadoop.hive.common.StatsSetupConst + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.CastSupport +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, ExternalCatalogUtils, HiveTableRelation} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, Expression, ExpressionSet, SubqueryExpression} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.internal.SQLConf + +/** + * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. + */ +private[sql] class PruneHiveTablePartitions(session: SparkSession) + extends Rule[LogicalPlan] with CastSupport { + + override val conf: SQLConf = session.sessionState.conf + + /** + * Extract the partition filters from the filters on the table. + */ + private def getPartitionKeyFilters( + filters: Seq[Expression], + relation: HiveTableRelation): ExpressionSet = { +val normalizedFilters = DataSourceStrategy.normalizeExprs( + filters.filter(f => f.deterministic && !SubqueryExpression.hasSubquery(f)), relation.output) +val partitionColumnSet = AttributeSet(relation.partitionCols) +ExpressionSet(normalizedFilters.filter { f => + !f.references.isEmpty && f.references.subsetOf(partitionColumnSet) +}) + } + + /** + * Prune the hive table using filters on the partitions of the table. + */ + private def prunePartitions( + relation: HiveTableRelation, + partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = { +if (conf.metastorePartitionPruning) { + session.sessionState.catalog.listPartitionsByFilter( +relation.tableMeta.identifier, partitionFilters.toSeq) +} else { + ExternalCatalogUtils.prunePartitionsByFilter(relation.tableMeta, Review comment: sorry I misread the code. so there are 2 optimizations: 1. pushdown predicates to hive metastore 2. pushdown predicates earlier to get precise data size info. This rule is for the second optimization, and this branch is for skip the first optimization. makes sense to me This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions URL: https://github.com/apache/spark/pull/26805#discussion_r367788000 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.hadoop.hive.common.StatsSetupConst + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.CastSupport +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, ExternalCatalogUtils, HiveTableRelation} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, Expression, ExpressionSet, SubqueryExpression} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.internal.SQLConf + +/** + * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. + */ +private[sql] class PruneHiveTablePartitions(session: SparkSession) + extends Rule[LogicalPlan] with CastSupport { + + override val conf: SQLConf = session.sessionState.conf + + /** + * Extract the partition filters from the filters on the table. + */ + private def getPartitionKeyFilters( + filters: Seq[Expression], + relation: HiveTableRelation): ExpressionSet = { +val normalizedFilters = DataSourceStrategy.normalizeExprs( + filters.filter(f => f.deterministic && !SubqueryExpression.hasSubquery(f)), relation.output) +val partitionColumnSet = AttributeSet(relation.partitionCols) +ExpressionSet(normalizedFilters.filter { f => + !f.references.isEmpty && f.references.subsetOf(partitionColumnSet) +}) + } + + /** + * Prune the hive table using filters on the partitions of the table. + */ + private def prunePartitions( + relation: HiveTableRelation, + partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = { +if (conf.metastorePartitionPruning) { + session.sessionState.catalog.listPartitionsByFilter( +relation.tableMeta.identifier, partitionFilters.toSeq) +} else { + ExternalCatalogUtils.prunePartitionsByFilter(relation.tableMeta, Review comment: can we remove this branch? We should always do the optimization. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions URL: https://github.com/apache/spark/pull/26805#discussion_r367304505 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.io.IOException + +import org.apache.hadoop.hive.common.StatsSetupConst + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.CastSupport +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, HiveTableRelation} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, Expression, ExpressionSet, SubqueryExpression} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.DataSourceStrategy + +/** + * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. + */ +private[sql] class PruneHiveTablePartitions(session: SparkSession) + extends Rule[LogicalPlan] with CastSupport { + + override val conf = session.sessionState.conf + + /** + * Extract the partition filters from the filters on the table. + */ + private def getPartitionKeyFilters( + filters: Seq[Expression], + relation: HiveTableRelation): ExpressionSet = { +val normalizedFilters = DataSourceStrategy.normalizeExprs( + filters.filter(f => f.deterministic && !SubqueryExpression.hasSubquery(f)), relation.output) +val partitionColumnSet = AttributeSet(relation.partitionCols) +ExpressionSet(normalizedFilters.filter { f => + !f.references.isEmpty && f.references.subsetOf(partitionColumnSet) +}) + } + + /** + * Prune the hive table using filters on the partitions of the table. + */ + private def prunePartitions( + relation: HiveTableRelation, + partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = { +if (conf.metastorePartitionPruning) { Review comment: actually, nvm. Users can exclude optimizer rules to disable them. Let's follow `PruneFileSourcePartitions` and always do pruning. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions URL: https://github.com/apache/spark/pull/26805#discussion_r367303681 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.io.IOException + +import org.apache.hadoop.hive.common.StatsSetupConst + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.CastSupport +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, HiveTableRelation} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, Expression, ExpressionSet, SubqueryExpression} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.DataSourceStrategy + +/** + * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. + */ +private[sql] class PruneHiveTablePartitions(session: SparkSession) + extends Rule[LogicalPlan] with CastSupport { + + override val conf = session.sessionState.conf + + /** + * Extract the partition filters from the filters on the table. + */ + private def getPartitionKeyFilters( + filters: Seq[Expression], + relation: HiveTableRelation): ExpressionSet = { +val normalizedFilters = DataSourceStrategy.normalizeExprs( + filters.filter(f => f.deterministic && !SubqueryExpression.hasSubquery(f)), relation.output) +val partitionColumnSet = AttributeSet(relation.partitionCols) +ExpressionSet(normalizedFilters.filter { f => + !f.references.isEmpty && f.references.subsetOf(partitionColumnSet) +}) + } + + /** + * Prune the hive table using filters on the partitions of the table. + */ + private def prunePartitions( + relation: HiveTableRelation, + partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = { +if (conf.metastorePartitionPruning) { Review comment: yea This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions URL: https://github.com/apache/spark/pull/26805#discussion_r367229511 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.io.IOException + +import org.apache.hadoop.hive.common.StatsSetupConst + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.CastSupport +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, HiveTableRelation} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, Expression, ExpressionSet, SubqueryExpression} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.DataSourceStrategy + +/** + * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. + */ +private[sql] class PruneHiveTablePartitions(session: SparkSession) + extends Rule[LogicalPlan] with CastSupport { + + override val conf = session.sessionState.conf + + /** + * Extract the partition filters from the filters on the table. + */ + private def getPartitionKeyFilters( + filters: Seq[Expression], + relation: HiveTableRelation): ExpressionSet = { +val normalizedFilters = DataSourceStrategy.normalizeExprs( + filters.filter(f => f.deterministic && !SubqueryExpression.hasSubquery(f)), relation.output) +val partitionColumnSet = AttributeSet(relation.partitionCols) +ExpressionSet(normalizedFilters.filter { f => + !f.references.isEmpty && f.references.subsetOf(partitionColumnSet) +}) + } + + /** + * Prune the hive table using filters on the partitions of the table. + */ + private def prunePartitions( + relation: HiveTableRelation, + partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = { +if (conf.metastorePartitionPruning) { Review comment: then can we add a new config for this optimization? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions URL: https://github.com/apache/spark/pull/26805#discussion_r367229511 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.io.IOException + +import org.apache.hadoop.hive.common.StatsSetupConst + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.CastSupport +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, HiveTableRelation} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, Expression, ExpressionSet, SubqueryExpression} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.DataSourceStrategy + +/** + * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. + */ +private[sql] class PruneHiveTablePartitions(session: SparkSession) + extends Rule[LogicalPlan] with CastSupport { + + override val conf = session.sessionState.conf + + /** + * Extract the partition filters from the filters on the table. + */ + private def getPartitionKeyFilters( + filters: Seq[Expression], + relation: HiveTableRelation): ExpressionSet = { +val normalizedFilters = DataSourceStrategy.normalizeExprs( + filters.filter(f => f.deterministic && !SubqueryExpression.hasSubquery(f)), relation.output) +val partitionColumnSet = AttributeSet(relation.partitionCols) +ExpressionSet(normalizedFilters.filter { f => + !f.references.isEmpty && f.references.subsetOf(partitionColumnSet) +}) + } + + /** + * Prune the hive table using filters on the partitions of the table. + */ + private def prunePartitions( + relation: HiveTableRelation, + partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = { +if (conf.metastorePartitionPruning) { Review comment: then can do add a new config for this optimization? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions URL: https://github.com/apache/spark/pull/26805#discussion_r366739954 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.io.IOException + +import org.apache.hadoop.hive.common.StatsSetupConst + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.CastSupport +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, HiveTableRelation} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, Expression, ExpressionSet, SubqueryExpression} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.DataSourceStrategy + +/** + * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. + */ +private[sql] class PruneHiveTablePartitions(session: SparkSession) + extends Rule[LogicalPlan] with CastSupport { + + override val conf = session.sessionState.conf + + /** + * Extract the partition filters from the filters on the table. + */ + private def getPartitionKeyFilters( + filters: Seq[Expression], + relation: HiveTableRelation): ExpressionSet = { +val normalizedFilters = DataSourceStrategy.normalizeExprs( + filters.filter(f => f.deterministic && !SubqueryExpression.hasSubquery(f)), relation.output) +val partitionColumnSet = AttributeSet(relation.partitionCols) +ExpressionSet(normalizedFilters.filter { f => + !f.references.isEmpty && f.references.subsetOf(partitionColumnSet) +}) + } + + /** + * Prune the hive table using filters on the partitions of the table. + */ + private def prunePartitions( + relation: HiveTableRelation, + partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = { +if (conf.metastorePartitionPruning) { + session.sessionState.catalog.listPartitionsByFilter( +relation.tableMeta.identifier, partitionFilters.toSeq) +} else { + session.sessionState.catalog.listPartitions(relation.tableMeta.identifier) +} + } + + /** + * Update the statistics of the table. + */ + private def updateTableMeta( + tableMeta: CatalogTable, + prunedPartitions: Seq[CatalogTablePartition]): CatalogTable = { +val sizeOfPartitions = try { + prunedPartitions.map { partition => +val rawDataSize = partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong) +val totalSize = partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong) +if (rawDataSize.isDefined && rawDataSize.get > 0) { + rawDataSize.get +} else if (totalSize.isDefined && totalSize.get > 0L) { + totalSize.get +} else { + 0L +} + }.sum +} catch { + case e: IOException => Review comment: how can we throw IO exception? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions URL: https://github.com/apache/spark/pull/26805#discussion_r366739843 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.io.IOException + +import org.apache.hadoop.hive.common.StatsSetupConst + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.CastSupport +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, HiveTableRelation} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, Expression, ExpressionSet, SubqueryExpression} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.DataSourceStrategy + +/** + * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. + */ +private[sql] class PruneHiveTablePartitions(session: SparkSession) + extends Rule[LogicalPlan] with CastSupport { + + override val conf = session.sessionState.conf + + /** + * Extract the partition filters from the filters on the table. + */ + private def getPartitionKeyFilters( + filters: Seq[Expression], + relation: HiveTableRelation): ExpressionSet = { +val normalizedFilters = DataSourceStrategy.normalizeExprs( + filters.filter(f => f.deterministic && !SubqueryExpression.hasSubquery(f)), relation.output) +val partitionColumnSet = AttributeSet(relation.partitionCols) +ExpressionSet(normalizedFilters.filter { f => + !f.references.isEmpty && f.references.subsetOf(partitionColumnSet) +}) + } + + /** + * Prune the hive table using filters on the partitions of the table. + */ + private def prunePartitions( + relation: HiveTableRelation, + partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = { +if (conf.metastorePartitionPruning) { + session.sessionState.catalog.listPartitionsByFilter( +relation.tableMeta.identifier, partitionFilters.toSeq) +} else { + session.sessionState.catalog.listPartitions(relation.tableMeta.identifier) +} + } + + /** + * Update the statistics of the table. + */ + private def updateTableMeta( + tableMeta: CatalogTable, + prunedPartitions: Seq[CatalogTablePartition]): CatalogTable = { +val sizeOfPartitions = try { + prunedPartitions.map { partition => +val rawDataSize = partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong) +val totalSize = partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong) +if (rawDataSize.isDefined && rawDataSize.get > 0) { + rawDataSize.get +} else if (totalSize.isDefined && totalSize.get > 0L) { + totalSize.get +} else { + 0L Review comment: what if some partitions have size stats and some don't? I think it's safer to not change the table stats if there is one partition that doesn't have size stats. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions URL: https://github.com/apache/spark/pull/26805#discussion_r366738131 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.io.IOException + +import org.apache.hadoop.hive.common.StatsSetupConst + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.CastSupport +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, HiveTableRelation} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, Expression, ExpressionSet, SubqueryExpression} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.DataSourceStrategy + +/** + * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. + */ +private[sql] class PruneHiveTablePartitions(session: SparkSession) + extends Rule[LogicalPlan] with CastSupport { + + override val conf = session.sessionState.conf + + /** + * Extract the partition filters from the filters on the table. + */ + private def getPartitionKeyFilters( + filters: Seq[Expression], + relation: HiveTableRelation): ExpressionSet = { +val normalizedFilters = DataSourceStrategy.normalizeExprs( + filters.filter(f => f.deterministic && !SubqueryExpression.hasSubquery(f)), relation.output) +val partitionColumnSet = AttributeSet(relation.partitionCols) +ExpressionSet(normalizedFilters.filter { f => + !f.references.isEmpty && f.references.subsetOf(partitionColumnSet) +}) + } + + /** + * Prune the hive table using filters on the partitions of the table. + */ + private def prunePartitions( + relation: HiveTableRelation, + partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = { +if (conf.metastorePartitionPruning) { Review comment: instead of check the config here and list all partition if not enabled, we can simply check the config in `apply`: https://github.com/apache/spark/pull/26805/files#diff-6be42cfa3c62a7536b1eb1d6447c073cR99 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions URL: https://github.com/apache/spark/pull/26805#discussion_r366458689 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala ## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.io.IOException + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.hive.common.StatsSetupConst + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.CastSupport +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, HiveTableRelation} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, BindReferences, Expression, Literal, SubqueryExpression} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.command.CommandUtils +import org.apache.spark.sql.hive.client.HiveClientImpl +import org.apache.spark.sql.types.BooleanType + +/** + * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. + */ +private[sql] class PruneHiveTablePartitions(session: SparkSession) + extends Rule[LogicalPlan] with CastSupport { + + override val conf = session.sessionState.conf + + /** + * Extract the partition filters from the filters on the table. + */ + private def extractPartitionPruningFilters( + filters: Seq[Expression], + relation: HiveTableRelation): Seq[Expression] = { +val normalizedFilters = filters.map { e => Review comment: did you follow it? I don't see you filter out non-deterministic filters and other stuff like `PruneFileSourcePartitions.getPartitionKeyFilters` did. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions URL: https://github.com/apache/spark/pull/26805#discussion_r366238516 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala ## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.io.IOException + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.hive.common.StatsSetupConst + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.CastSupport +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, HiveTableRelation} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, BindReferences, Expression, Literal, SubqueryExpression} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.command.CommandUtils +import org.apache.spark.sql.hive.client.HiveClientImpl +import org.apache.spark.sql.types.BooleanType + +/** + * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. + */ +private[sql] class PruneHiveTablePartitions(session: SparkSession) + extends Rule[LogicalPlan] with CastSupport { + + override val conf = session.sessionState.conf + + /** + * Extract the partition filters from the filters on the table. + */ + private def extractPartitionPruningFilters( + filters: Seq[Expression], + relation: HiveTableRelation): Seq[Expression] = { +val normalizedFilters = filters.map { e => + e transform { +case a: AttributeReference => + a.withName(relation.output.find(_.semanticEquals(a)).get.name) + } +} +val partitionSet = AttributeSet(relation.partitionCols) +normalizedFilters.filter { predicate => + !predicate.references.isEmpty && predicate.references.subsetOf(partitionSet) +} + } + + /** + * Prune the hive table using filters on the partitions of the table. + */ + private def prunePartitions(relation: HiveTableRelation, partitionFilters: Seq[Expression]) + : Seq[CatalogTablePartition] = { +val partitions = +if (conf.metastorePartitionPruning) { + session.sessionState.catalog.listPartitionsByFilter( +relation.tableMeta.identifier, partitionFilters) +} else { + session.sessionState.catalog.listPartitions(relation.tableMeta.identifier) +} +val shouldKeep = partitionFilters.reduceLeftOption(And).map { filter => Review comment: Yea, but we can leave it in followup or fix it in this PR. The code newly added here should be updated. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions URL: https://github.com/apache/spark/pull/26805#discussion_r366238516 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala ## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.io.IOException + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.hive.common.StatsSetupConst + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.CastSupport +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, HiveTableRelation} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, BindReferences, Expression, Literal, SubqueryExpression} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.command.CommandUtils +import org.apache.spark.sql.hive.client.HiveClientImpl +import org.apache.spark.sql.types.BooleanType + +/** + * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. + */ +private[sql] class PruneHiveTablePartitions(session: SparkSession) + extends Rule[LogicalPlan] with CastSupport { + + override val conf = session.sessionState.conf + + /** + * Extract the partition filters from the filters on the table. + */ + private def extractPartitionPruningFilters( + filters: Seq[Expression], + relation: HiveTableRelation): Seq[Expression] = { +val normalizedFilters = filters.map { e => + e transform { +case a: AttributeReference => + a.withName(relation.output.find(_.semanticEquals(a)).get.name) + } +} +val partitionSet = AttributeSet(relation.partitionCols) +normalizedFilters.filter { predicate => + !predicate.references.isEmpty && predicate.references.subsetOf(partitionSet) +} + } + + /** + * Prune the hive table using filters on the partitions of the table. + */ + private def prunePartitions(relation: HiveTableRelation, partitionFilters: Seq[Expression]) + : Seq[CatalogTablePartition] = { +val partitions = +if (conf.metastorePartitionPruning) { + session.sessionState.catalog.listPartitionsByFilter( +relation.tableMeta.identifier, partitionFilters) +} else { + session.sessionState.catalog.listPartitions(relation.tableMeta.identifier) +} +val shouldKeep = partitionFilters.reduceLeftOption(And).map { filter => Review comment: Yea, but we can leave it in followup. The code newly added here should be updated. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions URL: https://github.com/apache/spark/pull/26805#discussion_r366226969 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala ## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.io.IOException + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.hive.common.StatsSetupConst + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.CastSupport +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, HiveTableRelation} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, BindReferences, Expression, Literal, SubqueryExpression} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.command.CommandUtils +import org.apache.spark.sql.hive.client.HiveClientImpl +import org.apache.spark.sql.types.BooleanType + +/** + * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. + */ +private[sql] class PruneHiveTablePartitions(session: SparkSession) + extends Rule[LogicalPlan] with CastSupport { + + override val conf = session.sessionState.conf + + /** + * Extract the partition filters from the filters on the table. + */ + private def extractPartitionPruningFilters( + filters: Seq[Expression], + relation: HiveTableRelation): Seq[Expression] = { +val normalizedFilters = filters.map { e => + e transform { +case a: AttributeReference => + a.withName(relation.output.find(_.semanticEquals(a)).get.name) + } +} +val partitionSet = AttributeSet(relation.partitionCols) +normalizedFilters.filter { predicate => + !predicate.references.isEmpty && predicate.references.subsetOf(partitionSet) +} + } + + /** + * Prune the hive table using filters on the partitions of the table. + */ + private def prunePartitions(relation: HiveTableRelation, partitionFilters: Seq[Expression]) + : Seq[CatalogTablePartition] = { +val partitions = +if (conf.metastorePartitionPruning) { + session.sessionState.catalog.listPartitionsByFilter( +relation.tableMeta.identifier, partitionFilters) +} else { + session.sessionState.catalog.listPartitions(relation.tableMeta.identifier) +} +val shouldKeep = partitionFilters.reduceLeftOption(And).map { filter => Review comment: I mean, it's already fixed, `ExternalCatalog.listPartitionsByFilter` guarantees that the returned partitions are exactly what we want. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions URL: https://github.com/apache/spark/pull/26805#discussion_r366219935 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala ## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.io.IOException + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.hive.common.StatsSetupConst + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.CastSupport +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, HiveTableRelation} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, BindReferences, Expression, Literal, SubqueryExpression} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.command.CommandUtils +import org.apache.spark.sql.hive.client.HiveClientImpl +import org.apache.spark.sql.types.BooleanType + +/** + * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. + */ +private[sql] class PruneHiveTablePartitions(session: SparkSession) + extends Rule[LogicalPlan] with CastSupport { + + override val conf = session.sessionState.conf + + /** + * Extract the partition filters from the filters on the table. + */ + private def extractPartitionPruningFilters( + filters: Seq[Expression], + relation: HiveTableRelation): Seq[Expression] = { +val normalizedFilters = filters.map { e => + e transform { +case a: AttributeReference => + a.withName(relation.output.find(_.semanticEquals(a)).get.name) + } +} +val partitionSet = AttributeSet(relation.partitionCols) +normalizedFilters.filter { predicate => + !predicate.references.isEmpty && predicate.references.subsetOf(partitionSet) +} + } + + /** + * Prune the hive table using filters on the partitions of the table. + */ + private def prunePartitions(relation: HiveTableRelation, partitionFilters: Seq[Expression]) + : Seq[CatalogTablePartition] = { +val partitions = +if (conf.metastorePartitionPruning) { + session.sessionState.catalog.listPartitionsByFilter( +relation.tableMeta.identifier, partitionFilters) +} else { + session.sessionState.catalog.listPartitions(relation.tableMeta.identifier) +} +val shouldKeep = partitionFilters.reduceLeftOption(And).map { filter => Review comment: you are right hive may not support some predicates, so `HiveClientImpl.listPartitionByFilter` may return more partitions than we expect. But it should be fixed in `HiveExternalCatalog.listPartitionsByFilter` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions URL: https://github.com/apache/spark/pull/26805#discussion_r366219251 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala ## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.io.IOException + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.hive.common.StatsSetupConst + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.CastSupport +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, HiveTableRelation} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, BindReferences, Expression, Literal, SubqueryExpression} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.command.CommandUtils +import org.apache.spark.sql.hive.client.HiveClientImpl +import org.apache.spark.sql.types.BooleanType + +/** + * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. + */ +private[sql] class PruneHiveTablePartitions(session: SparkSession) + extends Rule[LogicalPlan] with CastSupport { + + override val conf = session.sessionState.conf + + /** + * Extract the partition filters from the filters on the table. + */ + private def extractPartitionPruningFilters( + filters: Seq[Expression], + relation: HiveTableRelation): Seq[Expression] = { +val normalizedFilters = filters.map { e => + e transform { +case a: AttributeReference => + a.withName(relation.output.find(_.semanticEquals(a)).get.name) + } +} +val partitionSet = AttributeSet(relation.partitionCols) +normalizedFilters.filter { predicate => + !predicate.references.isEmpty && predicate.references.subsetOf(partitionSet) +} + } + + /** + * Prune the hive table using filters on the partitions of the table. + */ + private def prunePartitions(relation: HiveTableRelation, partitionFilters: Seq[Expression]) + : Seq[CatalogTablePartition] = { +val partitions = +if (conf.metastorePartitionPruning) { + session.sessionState.catalog.listPartitionsByFilter( +relation.tableMeta.identifier, partitionFilters) +} else { + session.sessionState.catalog.listPartitions(relation.tableMeta.identifier) +} +val shouldKeep = partitionFilters.reduceLeftOption(And).map { filter => + require(filter.dataType == BooleanType, +s"Data type of predicate $filter must be ${BooleanType.catalogString} rather than " + + s"${filter.dataType.catalogString}.") + BindReferences.bindReference(filter, relation.partitionCols) +} +if (shouldKeep.nonEmpty) { + partitions.filter{ partition => +val hivePartition = + HiveClientImpl.toHivePartition(partition, HiveClientImpl.toHiveTable(relation.tableMeta)) +val dataTypes = relation.partitionCols.map(_.dataType) +val castedValues = hivePartition.getValues.asScala.zip(dataTypes) + .map { case (value, dataType) => cast(Literal(value), dataType).eval(null) } +val row = InternalRow.fromSeq(castedValues) +shouldKeep.get.eval(row).asInstanceOf[Boolean] + } +} else { + partitions +} + } + + /** + * Update the statistics of the table. + */ + private def updateTableMeta( + tableMeta: CatalogTable, + prunedPartitions: Seq[CatalogTablePartition]): CatalogTable = { +val sizeInBytes = try { + prunedPartitions.map { partition => +val rawDataSize = partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong) +val totalSize = partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong) +if (rawDataSize.isDefined && rawData
[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions URL: https://github.com/apache/spark/pull/26805#discussion_r365788538 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala ## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.io.IOException + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.hive.common.StatsSetupConst + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.CastSupport +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, HiveTableRelation} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, BindReferences, Expression, Literal, SubqueryExpression} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.command.CommandUtils +import org.apache.spark.sql.hive.client.HiveClientImpl +import org.apache.spark.sql.types.BooleanType + +/** + * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. + */ +private[sql] class PruneHiveTablePartitions(session: SparkSession) + extends Rule[LogicalPlan] with CastSupport { + + override val conf = session.sessionState.conf + + /** + * Extract the partition filters from the filters on the table. + */ + private def extractPartitionPruningFilters( + filters: Seq[Expression], + relation: HiveTableRelation): Seq[Expression] = { +val normalizedFilters = filters.map { e => + e transform { +case a: AttributeReference => + a.withName(relation.output.find(_.semanticEquals(a)).get.name) + } +} +val partitionSet = AttributeSet(relation.partitionCols) +normalizedFilters.filter { predicate => + !predicate.references.isEmpty && predicate.references.subsetOf(partitionSet) +} + } + + /** + * Prune the hive table using filters on the partitions of the table. + */ + private def prunePartitions(relation: HiveTableRelation, partitionFilters: Seq[Expression]) + : Seq[CatalogTablePartition] = { +val partitions = +if (conf.metastorePartitionPruning) { + session.sessionState.catalog.listPartitionsByFilter( +relation.tableMeta.identifier, partitionFilters) +} else { + session.sessionState.catalog.listPartitions(relation.tableMeta.identifier) +} +val shouldKeep = partitionFilters.reduceLeftOption(And).map { filter => + require(filter.dataType == BooleanType, +s"Data type of predicate $filter must be ${BooleanType.catalogString} rather than " + + s"${filter.dataType.catalogString}.") + BindReferences.bindReference(filter, relation.partitionCols) +} +if (shouldKeep.nonEmpty) { + partitions.filter{ partition => +val hivePartition = + HiveClientImpl.toHivePartition(partition, HiveClientImpl.toHiveTable(relation.tableMeta)) +val dataTypes = relation.partitionCols.map(_.dataType) +val castedValues = hivePartition.getValues.asScala.zip(dataTypes) + .map { case (value, dataType) => cast(Literal(value), dataType).eval(null) } +val row = InternalRow.fromSeq(castedValues) +shouldKeep.get.eval(row).asInstanceOf[Boolean] + } +} else { + partitions +} + } + + /** + * Update the statistics of the table. + */ + private def updateTableMeta( + tableMeta: CatalogTable, + prunedPartitions: Seq[CatalogTablePartition]): CatalogTable = { +val sizeInBytes = try { + prunedPartitions.map { partition => +val rawDataSize = partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong) +val totalSize = partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong) +if (rawDataSize.isDefined && rawData
[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions URL: https://github.com/apache/spark/pull/26805#discussion_r365788538 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala ## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.io.IOException + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.hive.common.StatsSetupConst + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.CastSupport +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, HiveTableRelation} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, BindReferences, Expression, Literal, SubqueryExpression} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.command.CommandUtils +import org.apache.spark.sql.hive.client.HiveClientImpl +import org.apache.spark.sql.types.BooleanType + +/** + * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. + */ +private[sql] class PruneHiveTablePartitions(session: SparkSession) + extends Rule[LogicalPlan] with CastSupport { + + override val conf = session.sessionState.conf + + /** + * Extract the partition filters from the filters on the table. + */ + private def extractPartitionPruningFilters( + filters: Seq[Expression], + relation: HiveTableRelation): Seq[Expression] = { +val normalizedFilters = filters.map { e => + e transform { +case a: AttributeReference => + a.withName(relation.output.find(_.semanticEquals(a)).get.name) + } +} +val partitionSet = AttributeSet(relation.partitionCols) +normalizedFilters.filter { predicate => + !predicate.references.isEmpty && predicate.references.subsetOf(partitionSet) +} + } + + /** + * Prune the hive table using filters on the partitions of the table. + */ + private def prunePartitions(relation: HiveTableRelation, partitionFilters: Seq[Expression]) + : Seq[CatalogTablePartition] = { +val partitions = +if (conf.metastorePartitionPruning) { + session.sessionState.catalog.listPartitionsByFilter( +relation.tableMeta.identifier, partitionFilters) +} else { + session.sessionState.catalog.listPartitions(relation.tableMeta.identifier) +} +val shouldKeep = partitionFilters.reduceLeftOption(And).map { filter => + require(filter.dataType == BooleanType, +s"Data type of predicate $filter must be ${BooleanType.catalogString} rather than " + + s"${filter.dataType.catalogString}.") + BindReferences.bindReference(filter, relation.partitionCols) +} +if (shouldKeep.nonEmpty) { + partitions.filter{ partition => +val hivePartition = + HiveClientImpl.toHivePartition(partition, HiveClientImpl.toHiveTable(relation.tableMeta)) +val dataTypes = relation.partitionCols.map(_.dataType) +val castedValues = hivePartition.getValues.asScala.zip(dataTypes) + .map { case (value, dataType) => cast(Literal(value), dataType).eval(null) } +val row = InternalRow.fromSeq(castedValues) +shouldKeep.get.eval(row).asInstanceOf[Boolean] + } +} else { + partitions +} + } + + /** + * Update the statistics of the table. + */ + private def updateTableMeta( + tableMeta: CatalogTable, + prunedPartitions: Seq[CatalogTablePartition]): CatalogTable = { +val sizeInBytes = try { + prunedPartitions.map { partition => +val rawDataSize = partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong) +val totalSize = partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong) +if (rawDataSize.isDefined && rawData
[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions URL: https://github.com/apache/spark/pull/26805#discussion_r365787617 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala ## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.io.IOException + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.hive.common.StatsSetupConst + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.CastSupport +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, HiveTableRelation} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, BindReferences, Expression, Literal, SubqueryExpression} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.command.CommandUtils +import org.apache.spark.sql.hive.client.HiveClientImpl +import org.apache.spark.sql.types.BooleanType + +/** + * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. + */ +private[sql] class PruneHiveTablePartitions(session: SparkSession) + extends Rule[LogicalPlan] with CastSupport { + + override val conf = session.sessionState.conf + + /** + * Extract the partition filters from the filters on the table. + */ + private def extractPartitionPruningFilters( + filters: Seq[Expression], + relation: HiveTableRelation): Seq[Expression] = { +val normalizedFilters = filters.map { e => + e transform { +case a: AttributeReference => + a.withName(relation.output.find(_.semanticEquals(a)).get.name) + } +} +val partitionSet = AttributeSet(relation.partitionCols) +normalizedFilters.filter { predicate => + !predicate.references.isEmpty && predicate.references.subsetOf(partitionSet) +} + } + + /** + * Prune the hive table using filters on the partitions of the table. + */ + private def prunePartitions(relation: HiveTableRelation, partitionFilters: Seq[Expression]) + : Seq[CatalogTablePartition] = { +val partitions = +if (conf.metastorePartitionPruning) { + session.sessionState.catalog.listPartitionsByFilter( +relation.tableMeta.identifier, partitionFilters) +} else { + session.sessionState.catalog.listPartitions(relation.tableMeta.identifier) +} +val shouldKeep = partitionFilters.reduceLeftOption(And).map { filter => Review comment: do we need this? AFAIK `ExternalCatalog.listPartitionsByFilter` does it already. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions URL: https://github.com/apache/spark/pull/26805#discussion_r365786611 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala ## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.io.IOException + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.hive.common.StatsSetupConst + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.CastSupport +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, HiveTableRelation} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, BindReferences, Expression, Literal, SubqueryExpression} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.command.CommandUtils +import org.apache.spark.sql.hive.client.HiveClientImpl +import org.apache.spark.sql.types.BooleanType + +/** + * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. + */ +private[sql] class PruneHiveTablePartitions(session: SparkSession) + extends Rule[LogicalPlan] with CastSupport { + + override val conf = session.sessionState.conf + + /** + * Extract the partition filters from the filters on the table. + */ + private def extractPartitionPruningFilters( + filters: Seq[Expression], + relation: HiveTableRelation): Seq[Expression] = { +val normalizedFilters = filters.map { e => + e transform { +case a: AttributeReference => + a.withName(relation.output.find(_.semanticEquals(a)).get.name) + } +} +val partitionSet = AttributeSet(relation.partitionCols) +normalizedFilters.filter { predicate => + !predicate.references.isEmpty && predicate.references.subsetOf(partitionSet) +} + } + + /** + * Prune the hive table using filters on the partitions of the table. + */ + private def prunePartitions(relation: HiveTableRelation, partitionFilters: Seq[Expression]) Review comment: nit: ``` def f( para1: T, para2: T): R = ... ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions URL: https://github.com/apache/spark/pull/26805#discussion_r365786376 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala ## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.io.IOException + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.hive.common.StatsSetupConst + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.CastSupport +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, HiveTableRelation} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, BindReferences, Expression, Literal, SubqueryExpression} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.command.CommandUtils +import org.apache.spark.sql.hive.client.HiveClientImpl +import org.apache.spark.sql.types.BooleanType + +/** + * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. + */ +private[sql] class PruneHiveTablePartitions(session: SparkSession) + extends Rule[LogicalPlan] with CastSupport { + + override val conf = session.sessionState.conf + + /** + * Extract the partition filters from the filters on the table. + */ + private def extractPartitionPruningFilters( + filters: Seq[Expression], + relation: HiveTableRelation): Seq[Expression] = { +val normalizedFilters = filters.map { e => Review comment: let's follow `PruneFileSourcePartitions.getPartitionKeyFilters` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions URL: https://github.com/apache/spark/pull/26805#discussion_r365786236 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala ## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.io.IOException + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.hive.common.StatsSetupConst + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.CastSupport +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, HiveTableRelation} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, BindReferences, Expression, Literal, SubqueryExpression} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.command.CommandUtils +import org.apache.spark.sql.hive.client.HiveClientImpl +import org.apache.spark.sql.types.BooleanType + +/** + * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. + */ +private[sql] class PruneHiveTablePartitions(session: SparkSession) + extends Rule[LogicalPlan] with CastSupport { + + override val conf = session.sessionState.conf + + /** + * Extract the partition filters from the filters on the table. + */ + private def extractPartitionPruningFilters( + filters: Seq[Expression], Review comment: nit: 4 space indentation This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions URL: https://github.com/apache/spark/pull/26805#discussion_r363162339 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ## @@ -1375,6 +1375,16 @@ object SQLConf { .booleanConf .createWithDefault(false) + val FALL_BACK_TO_HDFS_FOR_STATS_MAX_PART_NUM = +buildConf("spark.sql.statistics.fallBackToHdfs.maxPartitionNum") +.doc("If the number of table partitions exceed this value, falling back to hdfs " + + "for statistics calculation is not allowed. This is used to avoid calculating " + + "the size of a large number of partitions through hdfs, which is very time consuming." + + "Setting this value to 0 or negative will disable falling back to hdfs for " + + "partition statistic calculation.") Review comment: If this is a common problem, let's leave it here and open a new PR to fix it completely later. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions URL: https://github.com/apache/spark/pull/26805#discussion_r362397234 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ## @@ -150,6 +151,100 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { } } +/** + * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. + */ +case class PruneHiveTablePartitions(session: SparkSession) + extends Rule[LogicalPlan] with PredicateHelper { + /** + * Extract the partition filters from the filters on the table. + */ + private def extractPartitionPruningFilters(filters: Seq[Expression], + relation: HiveTableRelation): Seq[Expression] = { +val normalizedFilters = filters.map { e => + e transform { +case a: AttributeReference => + a.withName(relation.output.find(_.semanticEquals(a)).get.name) + } +} +val partitionSet = AttributeSet(relation.partitionCols) +normalizedFilters.filter { predicate => + !predicate.references.isEmpty && predicate.references.subsetOf(partitionSet) +} + } + + /** + * Prune the hive table using filters on the partitions of the table, + * and also update the statistics of the table. + */ + private def prunedHiveTableWithStats(relation: HiveTableRelation, + partitionFilters: Seq[Expression]): HiveTableRelation = { +val conf = session.sessionState.conf +val prunedPartitions = session.sharedState.externalCatalog.listPartitionsByFilter( + relation.tableMeta.database, + relation.tableMeta.identifier.table, + partitionFilters, + conf.sessionLocalTimeZone) +val sizeInBytes = try { + val partitionsWithSize = prunedPartitions.map { part => +val rawDataSize = part.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong) +val totalSize = part.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong) +if (rawDataSize.isDefined && rawDataSize.get > 0) { + (part, rawDataSize.get) +} else if (totalSize.isDefined && totalSize.get > 0L) { + (part, totalSize.get) +} else { + (part, 0L) +} + } + val sizeOfPartitions = Review comment: shall we do it in `DetermineTableStats`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions URL: https://github.com/apache/spark/pull/26805#discussion_r362397107 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ## @@ -150,6 +151,100 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { } } +/** + * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. + */ +case class PruneHiveTablePartitions(session: SparkSession) + extends Rule[LogicalPlan] with PredicateHelper { + /** + * Extract the partition filters from the filters on the table. + */ + private def extractPartitionPruningFilters(filters: Seq[Expression], + relation: HiveTableRelation): Seq[Expression] = { +val normalizedFilters = filters.map { e => + e transform { +case a: AttributeReference => + a.withName(relation.output.find(_.semanticEquals(a)).get.name) + } +} +val partitionSet = AttributeSet(relation.partitionCols) +normalizedFilters.filter { predicate => + !predicate.references.isEmpty && predicate.references.subsetOf(partitionSet) +} + } + + /** + * Prune the hive table using filters on the partitions of the table, + * and also update the statistics of the table. + */ + private def prunedHiveTableWithStats(relation: HiveTableRelation, + partitionFilters: Seq[Expression]): HiveTableRelation = { +val conf = session.sessionState.conf +val prunedPartitions = session.sharedState.externalCatalog.listPartitionsByFilter( Review comment: We should call `SessionCatalog.listPartitionsByFilter` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions URL: https://github.com/apache/spark/pull/26805#discussion_r362396970 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ## @@ -1375,6 +1375,16 @@ object SQLConf { .booleanConf .createWithDefault(false) + val FALL_BACK_TO_HDFS_FOR_STATS_MAX_PART_NUM = +buildConf("spark.sql.statistics.fallBackToHdfs.maxPartitionNum") +.doc("If the number of table partitions exceed this value, falling back to hdfs " + + "for statistics calculation is not allowed. This is used to avoid calculating " + + "the size of a large number of partitions through hdfs, which is very time consuming." + + "Setting this value to 0 or negative will disable falling back to hdfs for " + + "partition statistic calculation.") Review comment: does the data source table have the same problem? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions URL: https://github.com/apache/spark/pull/26805#discussion_r362396308 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ## @@ -150,6 +151,100 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { } } +/** + * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. + */ +case class PruneHiveTablePartitions(session: SparkSession) Review comment: let's move it to a separated file. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions URL: https://github.com/apache/spark/pull/26805#discussion_r362396438 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ## @@ -150,6 +151,100 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { } } +/** + * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. + */ +case class PruneHiveTablePartitions(session: SparkSession) + extends Rule[LogicalPlan] with PredicateHelper { + /** + * Extract the partition filters from the filters on the table. + */ + private def extractPartitionPruningFilters(filters: Seq[Expression], + relation: HiveTableRelation): Seq[Expression] = { +val normalizedFilters = filters.map { e => + e transform { +case a: AttributeReference => + a.withName(relation.output.find(_.semanticEquals(a)).get.name) + } +} +val partitionSet = AttributeSet(relation.partitionCols) +normalizedFilters.filter { predicate => + !predicate.references.isEmpty && predicate.references.subsetOf(partitionSet) +} + } + + /** + * Prune the hive table using filters on the partitions of the table, + * and also update the statistics of the table. + */ + private def prunedHiveTableWithStats(relation: HiveTableRelation, + partitionFilters: Seq[Expression]): HiveTableRelation = { +val conf = session.sessionState.conf +val prunedPartitions = session.sharedState.externalCatalog.listPartitionsByFilter( + relation.tableMeta.database, + relation.tableMeta.identifier.table, + partitionFilters, + conf.sessionLocalTimeZone) +val sizeInBytes = try { + val partitionsWithSize = prunedPartitions.map { part => +val rawDataSize = part.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong) +val totalSize = part.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong) +if (rawDataSize.isDefined && rawDataSize.get > 0) { + (part, rawDataSize.get) +} else if (totalSize.isDefined && totalSize.get > 0L) { + (part, totalSize.get) +} else { + (part, 0L) +} + } + val sizeOfPartitions = +if (partitionsWithSize.count(_._2==0) <= conf.fallBackToHdfsForStatsMaxPartitionNum) { + partitionsWithSize.map{ pair => +val (part, size) = (pair._1, pair._2) +if (size == 0) { + CommandUtils.calculateLocationSize( +session.sessionState, relation.tableMeta.identifier, part.storage.locationUri) +} else { + size +} + }.sum +} else { + partitionsWithSize.filter(_._2>0).map(_._2).sum +} + // If size of partitions is zero fall back to the default size. + if (sizeOfPartitions == 0L) conf.defaultSizeInBytes else sizeOfPartitions +} catch { + case e: IOException => +logWarning("Failed to get table size from HDFS.", e) +conf.defaultSizeInBytes +} +val withStats = + if (relation.tableMeta.stats.isDefined) { +relation.tableMeta.copy( + stats = Some(relation.tableMeta.stats.get.copy(sizeInBytes = BigInt(sizeInBytes + } else { +relation.tableMeta.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes + } +relation.copy(tableMeta = withStats, prunedPartitions = Some(prunedPartitions)) + } + + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case op @ PhysicalOperation(projections, filters, relation: HiveTableRelation) Review comment: 2 space indentation. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions URL: https://github.com/apache/spark/pull/26805#discussion_r362396387 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ## @@ -150,6 +151,100 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { } } +/** + * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. + */ +case class PruneHiveTablePartitions(session: SparkSession) + extends Rule[LogicalPlan] with PredicateHelper { + /** + * Extract the partition filters from the filters on the table. + */ + private def extractPartitionPruningFilters(filters: Seq[Expression], Review comment: nit: ``` def func( para1: T, para2: T... ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org