[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-02-10 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r377418518
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, ExternalCatalogUtils, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, 
Expression, ExpressionSet, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
 
 Review comment:
   @gatorsmile @cloud-fan classdoc added in #27535 , please help review, thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-02-07 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r376695382
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, ExternalCatalogUtils, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, 
Expression, ExpressionSet, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
 
 Review comment:
   sure, so you mean just add class description in 
PruneHiveTablePartitions.scala and PruneFileSourcePartitions.scala file ? Or 
need to add comment in some doc ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-31 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r366133849
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.IOException
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet, BindReferences, Expression, Literal, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.CommandUtils
+import org.apache.spark.sql.hive.client.HiveClientImpl
+import org.apache.spark.sql.types.BooleanType
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def extractPartitionPruningFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): Seq[Expression] = {
+val normalizedFilters = filters.map { e =>
+  e transform {
+case a: AttributeReference =>
+  a.withName(relation.output.find(_.semanticEquals(a)).get.name)
+  }
+}
+val partitionSet = AttributeSet(relation.partitionCols)
+normalizedFilters.filter { predicate =>
+  !predicate.references.isEmpty && 
predicate.references.subsetOf(partitionSet)
+}
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(relation: HiveTableRelation, partitionFilters: 
Seq[Expression])
+  : Seq[CatalogTablePartition] = {
+val partitions =
+if (conf.metastorePartitionPruning) {
+  session.sessionState.catalog.listPartitionsByFilter(
+relation.tableMeta.identifier, partitionFilters)
+} else {
+  
session.sessionState.catalog.listPartitions(relation.tableMeta.identifier)
+}
+val shouldKeep = partitionFilters.reduceLeftOption(And).map { filter =>
+  require(filter.dataType == BooleanType,
+s"Data type of predicate $filter must be ${BooleanType.catalogString} 
rather than " +
+  s"${filter.dataType.catalogString}.")
+  BindReferences.bindReference(filter, relation.partitionCols)
+}
+if (shouldKeep.nonEmpty) {
+  partitions.filter{ partition =>
+val hivePartition =
+  HiveClientImpl.toHivePartition(partition, 
HiveClientImpl.toHiveTable(relation.tableMeta))
+val dataTypes = relation.partitionCols.map(_.dataType)
+val castedValues = hivePartition.getValues.asScala.zip(dataTypes)
+  .map { case (value, dataType) => cast(Literal(value), 
dataType).eval(null) }
+val row = InternalRow.fromSeq(castedValues)
+shouldKeep.get.eval(row).asInstanceOf[Boolean]
+  }
+} else {
+  partitions
+}
+  }
+
+  /**
+   * Update the statistics of the table.
+   */
+  private def updateTableMeta(
+  tableMeta: CatalogTable,
+  prunedPartitions: Seq[CatalogTablePartition]): CatalogTable = {
+val sizeInBytes = try {
+  prunedPartitions.map { partition =>
+val rawDataSize = 
partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong)
+val totalSize = 
partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
+if (rawDataSize.isDefined && 

[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-21 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r368856739
 
 

 ##
 File path: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala
 ##
 @@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
+
+class PruneHiveTablePartitionsSuite extends QueryTest with SQLTestUtils with 
TestHiveSingleton {
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+val batches =
+  Batch("PruneHiveTablePartitions", Once,
+EliminateSubqueryAliases, new PruneHiveTablePartitions(spark)) :: Nil
+  }
+
+  test("SPARK-15616 statistics pruned after going throuhg 
PruneHiveTablePartitions") {
+withTable("test", "temp") {
+  sql(
+s"""
+   |CREATE TABLE test(i int)
+   |PARTITIONED BY (p int)
+   |STORED AS textfile""".stripMargin)
+  spark.range(0, 1000, 1).selectExpr("id as col")
+.createOrReplaceTempView("temp")
+
+  for (part <- Seq(1, 2, 3, 4)) {
+sql(s"""
+   |INSERT OVERWRITE TABLE test PARTITION (p='$part')
+   |select col from temp""".stripMargin)
+  }
+  val analyzed1 = sql("select i from test where 
p>0").queryExecution.analyzed
+  val analyzed2 = sql("select i from test where 
p=1").queryExecution.analyzed
+  assert(Optimize.execute(analyzed1).stats.sizeInBytes/4 ===
 
 Review comment:
   updated the code style, thanks a lot. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-21 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r368856556
 
 

 ##
 File path: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala
 ##
 @@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
+
+class PruneHiveTablePartitionsSuite extends QueryTest with SQLTestUtils with 
TestHiveSingleton {
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+val batches =
+  Batch("PruneHiveTablePartitions", Once,
+EliminateSubqueryAliases, new PruneHiveTablePartitions(spark)) :: Nil
+  }
+
+  test("SPARK-15616 statistics pruned after going throuhg 
PruneHiveTablePartitions") {
+withTable("test", "temp") {
+  sql(
+s"""
+   |CREATE TABLE test(i int)
+   |PARTITIONED BY (p int)
+   |STORED AS textfile""".stripMargin)
+  spark.range(0, 1000, 1).selectExpr("id as col")
+.createOrReplaceTempView("temp")
+
+  for (part <- Seq(1, 2, 3, 4)) {
+sql(s"""
+   |INSERT OVERWRITE TABLE test PARTITION (p='$part')
 
 Review comment:
   updated to use two-space indentation like PruneFileSourcePartitionsSuite.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-20 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r368792589
 
 

 ##
 File path: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala
 ##
 @@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
+
+class PruneHiveTablePartitionsSuite extends QueryTest with SQLTestUtils with 
TestHiveSingleton {
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+val batches =
+  Batch("PruneHiveTablePartitions", Once,
+EliminateSubqueryAliases, new PruneHiveTablePartitions(spark)) :: Nil
+  }
+
+  test("SPARK-15616 statistics pruned after going throuhg 
PruneHiveTablePartitions") {
+withTable("test", "temp") {
+  withTempDir { dir =>
+sql(
+  s"""
+ |CREATE EXTERNAL TABLE test(i int)
 
 Review comment:
   actually not necessary, already changed to managed table.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-20 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r368792589
 
 

 ##
 File path: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala
 ##
 @@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
+
+class PruneHiveTablePartitionsSuite extends QueryTest with SQLTestUtils with 
TestHiveSingleton {
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+val batches =
+  Batch("PruneHiveTablePartitions", Once,
+EliminateSubqueryAliases, new PruneHiveTablePartitions(spark)) :: Nil
+  }
+
+  test("SPARK-15616 statistics pruned after going throuhg 
PruneHiveTablePartitions") {
+withTable("test", "temp") {
+  withTempDir { dir =>
+sql(
+  s"""
+ |CREATE EXTERNAL TABLE test(i int)
 
 Review comment:
   actually not necessary, already changed to managed table. thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-20 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r368428800
 
 

 ##
 File path: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala
 ##
 @@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
+
+class PruneHiveTablePartitionsSuite extends QueryTest with SQLTestUtils with 
TestHiveSingleton {
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+val batches =
+  Batch("PruneHiveTablePartitions", Once,
+EliminateSubqueryAliases, new PruneHiveTablePartitions(spark)) :: Nil
+  }
+
+  test("SPARK-15616 statistics pruned after going throuhg 
PruneHiveTablePartitions") {
+withTable("test", "temp") {
+  withTempDir { dir =>
+sql(
+  s"""
+ |CREATE EXTERNAL TABLE test(i int)
+ |PARTITIONED BY (p int)
+ |STORED AS textfile
+ |LOCATION '${dir.toURI}'""".stripMargin)
+
+spark.range(0, 1000, 1).selectExpr("id as col")
+  .createOrReplaceTempView("temp")
+
+for (part <- Seq(1, 2, 3, 4)) {
+  sql(s"""
+ |INSERT OVERWRITE TABLE test PARTITION (p='$part')
+ |select col from temp""".stripMargin)
+}
+val singlePartitionSizeInBytes = 3890
+val analyzed1 = sql("select i from test where 
p>0").queryExecution.analyzed
+val analyzed2 = sql("select i from test where 
p=1").queryExecution.analyzed
+assert(Optimize.execute(analyzed1).stats.sizeInBytes === 
singlePartitionSizeInBytes*4*12/16)
 
 Review comment:
   sure, will change it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-20 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r368422320
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, ExternalCatalogUtils, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, 
Expression, ExpressionSet, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf: SQLConf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def getPartitionKeyFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): ExpressionSet = {
+val normalizedFilters = DataSourceStrategy.normalizeExprs(
+  filters.filter(f => f.deterministic && 
!SubqueryExpression.hasSubquery(f)), relation.output)
+val partitionColumnSet = AttributeSet(relation.partitionCols)
+ExpressionSet(normalizedFilters.filter { f =>
+  !f.references.isEmpty && f.references.subsetOf(partitionColumnSet)
+})
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(
+  relation: HiveTableRelation,
+  partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = {
+if (conf.metastorePartitionPruning) {
+  session.sessionState.catalog.listPartitionsByFilter(
+relation.tableMeta.identifier, partitionFilters.toSeq)
+} else {
+  ExternalCatalogUtils.prunePartitionsByFilter(relation.tableMeta,
+
session.sessionState.catalog.listPartitions(relation.tableMeta.identifier),
+partitionFilters.toSeq, conf.sessionLocalTimeZone)
+}
+  }
+
+  /**
+   * Update the statistics of the table.
+   */
+  private def updateTableMeta(
+  tableMeta: CatalogTable,
+  prunedPartitions: Seq[CatalogTablePartition]): CatalogTable = {
+val sizeOfPartitions = prunedPartitions.map { partition =>
+  val rawDataSize = 
partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong)
+  val totalSize = 
partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
+  if (rawDataSize.isDefined && rawDataSize.get > 0) {
+rawDataSize.get
+  } else if (totalSize.isDefined && totalSize.get > 0L) {
+totalSize.get
+  } else {
+0L
+  }
+}
+if (sizeOfPartitions.forall(s => s>0)) {
+  val sizeInBytes = sizeOfPartitions.sum
+  tableMeta.copy(stats = Some(CatalogStatistics(sizeInBytes = 
BigInt(sizeInBytes
+} else {
+  tableMeta
+}
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+case op @ PhysicalOperation(projections, filters, relation: 
HiveTableRelation)
+  if filters.nonEmpty && relation.isPartitioned && 
relation.prunedPartitions.isEmpty =>
+  val partitionKeyFilters = getPartitionKeyFilters(filters, relation)
+  if (partitionKeyFilters.nonEmpty) {
+val newPartitions = prunePartitions(relation, partitionKeyFilters)
+val newTableMeta = updateTableMeta(relation.tableMeta, newPartitions)
+val newRelation = relation.copy(
+  tableMeta = newTableMeta, prunedPartitions = Some(newPartitions))
+

[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-19 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r368396718
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, ExternalCatalogUtils, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, 
Expression, ExpressionSet, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf: SQLConf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def getPartitionKeyFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): ExpressionSet = {
+val normalizedFilters = DataSourceStrategy.normalizeExprs(
+  filters.filter(f => f.deterministic && 
!SubqueryExpression.hasSubquery(f)), relation.output)
+val partitionColumnSet = AttributeSet(relation.partitionCols)
+ExpressionSet(normalizedFilters.filter { f =>
+  !f.references.isEmpty && f.references.subsetOf(partitionColumnSet)
+})
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(
+  relation: HiveTableRelation,
+  partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = {
+if (conf.metastorePartitionPruning) {
+  session.sessionState.catalog.listPartitionsByFilter(
+relation.tableMeta.identifier, partitionFilters.toSeq)
+} else {
+  ExternalCatalogUtils.prunePartitionsByFilter(relation.tableMeta,
+
session.sessionState.catalog.listPartitions(relation.tableMeta.identifier),
+partitionFilters.toSeq, conf.sessionLocalTimeZone)
+}
+  }
+
+  /**
+   * Update the statistics of the table.
+   */
+  private def updateTableMeta(
+  tableMeta: CatalogTable,
+  prunedPartitions: Seq[CatalogTablePartition]): CatalogTable = {
+val sizeOfPartitions = prunedPartitions.map { partition =>
+  val rawDataSize = 
partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong)
+  val totalSize = 
partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
+  if (rawDataSize.isDefined && rawDataSize.get > 0) {
+rawDataSize.get
+  } else if (totalSize.isDefined && totalSize.get > 0L) {
+totalSize.get
+  } else {
+0L
+  }
+}
+if (sizeOfPartitions.forall(s => s>0)) {
+  val sizeInBytes = sizeOfPartitions.sum
+  tableMeta.copy(stats = Some(CatalogStatistics(sizeInBytes = 
BigInt(sizeInBytes
+} else {
+  tableMeta
+}
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+case op @ PhysicalOperation(projections, filters, relation: 
HiveTableRelation)
+  if filters.nonEmpty && relation.isPartitioned && 
relation.prunedPartitions.isEmpty =>
+  val partitionKeyFilters = getPartitionKeyFilters(filters, relation)
+  if (partitionKeyFilters.nonEmpty) {
+val newPartitions = prunePartitions(relation, partitionKeyFilters)
+val newTableMeta = updateTableMeta(relation.tableMeta, newPartitions)
+val newRelation = relation.copy(
+  tableMeta = newTableMeta,
+  pushedDownPartitionFilters = 

[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-19 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r368395173
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, ExternalCatalogUtils, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, 
Expression, ExpressionSet, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf: SQLConf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def getPartitionKeyFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): ExpressionSet = {
+val normalizedFilters = DataSourceStrategy.normalizeExprs(
+  filters.filter(f => f.deterministic && 
!SubqueryExpression.hasSubquery(f)), relation.output)
+val partitionColumnSet = AttributeSet(relation.partitionCols)
+ExpressionSet(normalizedFilters.filter { f =>
+  !f.references.isEmpty && f.references.subsetOf(partitionColumnSet)
+})
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(
+  relation: HiveTableRelation,
+  partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = {
+if (conf.metastorePartitionPruning) {
+  session.sessionState.catalog.listPartitionsByFilter(
+relation.tableMeta.identifier, partitionFilters.toSeq)
+} else {
+  ExternalCatalogUtils.prunePartitionsByFilter(relation.tableMeta,
+
session.sessionState.catalog.listPartitions(relation.tableMeta.identifier),
+partitionFilters.toSeq, conf.sessionLocalTimeZone)
+}
+  }
+
+  /**
+   * Update the statistics of the table.
+   */
+  private def updateTableMeta(
+  tableMeta: CatalogTable,
+  prunedPartitions: Seq[CatalogTablePartition]): CatalogTable = {
+val sizeOfPartitions = prunedPartitions.map { partition =>
+  val rawDataSize = 
partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong)
+  val totalSize = 
partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
+  if (rawDataSize.isDefined && rawDataSize.get > 0) {
+rawDataSize.get
+  } else if (totalSize.isDefined && totalSize.get > 0L) {
+totalSize.get
+  } else {
+0L
+  }
+}
+if (sizeOfPartitions.forall(s => s>0)) {
+  val sizeInBytes = sizeOfPartitions.sum
+  tableMeta.copy(stats = Some(CatalogStatistics(sizeInBytes = 
BigInt(sizeInBytes
+} else {
+  tableMeta
+}
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+case op @ PhysicalOperation(projections, filters, relation: 
HiveTableRelation)
+  if filters.nonEmpty && relation.isPartitioned && 
relation.prunedPartitions.isEmpty =>
+  val partitionKeyFilters = getPartitionKeyFilters(filters, relation)
+  if (partitionKeyFilters.nonEmpty) {
+val newPartitions = prunePartitions(relation, partitionKeyFilters)
+val newTableMeta = updateTableMeta(relation.tableMeta, newPartitions)
+val newRelation = relation.copy(
+  tableMeta = newTableMeta,
+  pushedDownPartitionFilters = 

[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-17 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r368196763
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, ExternalCatalogUtils, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, 
Expression, ExpressionSet, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf: SQLConf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def getPartitionKeyFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): ExpressionSet = {
+val normalizedFilters = DataSourceStrategy.normalizeExprs(
+  filters.filter(f => f.deterministic && 
!SubqueryExpression.hasSubquery(f)), relation.output)
+val partitionColumnSet = AttributeSet(relation.partitionCols)
+ExpressionSet(normalizedFilters.filter { f =>
+  !f.references.isEmpty && f.references.subsetOf(partitionColumnSet)
+})
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(
+  relation: HiveTableRelation,
+  partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = {
+if (conf.metastorePartitionPruning) {
+  session.sessionState.catalog.listPartitionsByFilter(
+relation.tableMeta.identifier, partitionFilters.toSeq)
+} else {
+  ExternalCatalogUtils.prunePartitionsByFilter(relation.tableMeta,
+
session.sessionState.catalog.listPartitions(relation.tableMeta.identifier),
+partitionFilters.toSeq, conf.sessionLocalTimeZone)
+}
+  }
+
+  /**
+   * Update the statistics of the table.
+   */
+  private def updateTableMeta(
+  tableMeta: CatalogTable,
+  prunedPartitions: Seq[CatalogTablePartition]): CatalogTable = {
+val sizeOfPartitions = prunedPartitions.map { partition =>
+  val rawDataSize = 
partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong)
+  val totalSize = 
partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
+  if (rawDataSize.isDefined && rawDataSize.get > 0) {
+rawDataSize.get
+  } else if (totalSize.isDefined && totalSize.get > 0L) {
+totalSize.get
+  } else {
+0L
+  }
+}
+if (sizeOfPartitions.forall(s => s>0)) {
+  val sizeInBytes = sizeOfPartitions.sum
+  tableMeta.copy(stats = Some(CatalogStatistics(sizeInBytes = 
BigInt(sizeInBytes
+} else {
+  tableMeta
+}
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+case op @ PhysicalOperation(projections, filters, relation: 
HiveTableRelation)
+  if filters.nonEmpty && relation.isPartitioned && 
relation.prunedPartitions.isEmpty =>
+  val partitionKeyFilters = getPartitionKeyFilters(filters, relation)
+  if (partitionKeyFilters.nonEmpty) {
+val newPartitions = prunePartitions(relation, partitionKeyFilters)
+val newTableMeta = updateTableMeta(relation.tableMeta, newPartitions)
+val newRelation = relation.copy(
+  tableMeta = newTableMeta, prunedPartitions = Some(newPartitions))
+

[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-17 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r367971538
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, ExternalCatalogUtils, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, 
Expression, ExpressionSet, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf: SQLConf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def getPartitionKeyFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): ExpressionSet = {
+val normalizedFilters = DataSourceStrategy.normalizeExprs(
+  filters.filter(f => f.deterministic && 
!SubqueryExpression.hasSubquery(f)), relation.output)
+val partitionColumnSet = AttributeSet(relation.partitionCols)
+ExpressionSet(normalizedFilters.filter { f =>
+  !f.references.isEmpty && f.references.subsetOf(partitionColumnSet)
+})
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(
+  relation: HiveTableRelation,
+  partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = {
+if (conf.metastorePartitionPruning) {
+  session.sessionState.catalog.listPartitionsByFilter(
+relation.tableMeta.identifier, partitionFilters.toSeq)
+} else {
+  ExternalCatalogUtils.prunePartitionsByFilter(relation.tableMeta,
+
session.sessionState.catalog.listPartitions(relation.tableMeta.identifier),
+partitionFilters.toSeq, conf.sessionLocalTimeZone)
+}
+  }
+
+  /**
+   * Update the statistics of the table.
+   */
+  private def updateTableMeta(
+  tableMeta: CatalogTable,
+  prunedPartitions: Seq[CatalogTablePartition]): CatalogTable = {
+val sizeOfPartitions = prunedPartitions.map { partition =>
+  val rawDataSize = 
partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong)
+  val totalSize = 
partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
+  if (rawDataSize.isDefined && rawDataSize.get > 0) {
+rawDataSize.get
+  } else if (totalSize.isDefined && totalSize.get > 0L) {
+totalSize.get
+  } else {
+0L
+  }
+}
+if (sizeOfPartitions.forall(s => s>0)) {
+  val sizeInBytes = sizeOfPartitions.sum
+  tableMeta.copy(stats = Some(CatalogStatistics(sizeInBytes = 
BigInt(sizeInBytes
+} else {
+  tableMeta
+}
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+case op @ PhysicalOperation(projections, filters, relation: 
HiveTableRelation)
+  if filters.nonEmpty && relation.isPartitioned && 
relation.prunedPartitions.isEmpty =>
+  val partitionKeyFilters = getPartitionKeyFilters(filters, relation)
+  if (partitionKeyFilters.nonEmpty) {
+val newPartitions = prunePartitions(relation, partitionKeyFilters)
+val newTableMeta = updateTableMeta(relation.tableMeta, newPartitions)
+val newRelation = relation.copy(
+  tableMeta = newTableMeta, prunedPartitions = Some(newPartitions))
+

[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-17 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r367971538
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, ExternalCatalogUtils, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, 
Expression, ExpressionSet, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf: SQLConf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def getPartitionKeyFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): ExpressionSet = {
+val normalizedFilters = DataSourceStrategy.normalizeExprs(
+  filters.filter(f => f.deterministic && 
!SubqueryExpression.hasSubquery(f)), relation.output)
+val partitionColumnSet = AttributeSet(relation.partitionCols)
+ExpressionSet(normalizedFilters.filter { f =>
+  !f.references.isEmpty && f.references.subsetOf(partitionColumnSet)
+})
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(
+  relation: HiveTableRelation,
+  partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = {
+if (conf.metastorePartitionPruning) {
+  session.sessionState.catalog.listPartitionsByFilter(
+relation.tableMeta.identifier, partitionFilters.toSeq)
+} else {
+  ExternalCatalogUtils.prunePartitionsByFilter(relation.tableMeta,
+
session.sessionState.catalog.listPartitions(relation.tableMeta.identifier),
+partitionFilters.toSeq, conf.sessionLocalTimeZone)
+}
+  }
+
+  /**
+   * Update the statistics of the table.
+   */
+  private def updateTableMeta(
+  tableMeta: CatalogTable,
+  prunedPartitions: Seq[CatalogTablePartition]): CatalogTable = {
+val sizeOfPartitions = prunedPartitions.map { partition =>
+  val rawDataSize = 
partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong)
+  val totalSize = 
partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
+  if (rawDataSize.isDefined && rawDataSize.get > 0) {
+rawDataSize.get
+  } else if (totalSize.isDefined && totalSize.get > 0L) {
+totalSize.get
+  } else {
+0L
+  }
+}
+if (sizeOfPartitions.forall(s => s>0)) {
+  val sizeInBytes = sizeOfPartitions.sum
+  tableMeta.copy(stats = Some(CatalogStatistics(sizeInBytes = 
BigInt(sizeInBytes
+} else {
+  tableMeta
+}
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+case op @ PhysicalOperation(projections, filters, relation: 
HiveTableRelation)
+  if filters.nonEmpty && relation.isPartitioned && 
relation.prunedPartitions.isEmpty =>
+  val partitionKeyFilters = getPartitionKeyFilters(filters, relation)
+  if (partitionKeyFilters.nonEmpty) {
+val newPartitions = prunePartitions(relation, partitionKeyFilters)
+val newTableMeta = updateTableMeta(relation.tableMeta, newPartitions)
+val newRelation = relation.copy(
+  tableMeta = newTableMeta, prunedPartitions = Some(newPartitions))
+

[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-17 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r367836420
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, ExternalCatalogUtils, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, 
Expression, ExpressionSet, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf: SQLConf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def getPartitionKeyFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): ExpressionSet = {
+val normalizedFilters = DataSourceStrategy.normalizeExprs(
+  filters.filter(f => f.deterministic && 
!SubqueryExpression.hasSubquery(f)), relation.output)
+val partitionColumnSet = AttributeSet(relation.partitionCols)
+ExpressionSet(normalizedFilters.filter { f =>
+  !f.references.isEmpty && f.references.subsetOf(partitionColumnSet)
+})
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(
+  relation: HiveTableRelation,
+  partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = {
+if (conf.metastorePartitionPruning) {
+  session.sessionState.catalog.listPartitionsByFilter(
+relation.tableMeta.identifier, partitionFilters.toSeq)
+} else {
+  ExternalCatalogUtils.prunePartitionsByFilter(relation.tableMeta,
 
 Review comment:
   so we don't need change here, right?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-17 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r367836199
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, ExternalCatalogUtils, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, 
Expression, ExpressionSet, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf: SQLConf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def getPartitionKeyFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): ExpressionSet = {
+val normalizedFilters = DataSourceStrategy.normalizeExprs(
+  filters.filter(f => f.deterministic && 
!SubqueryExpression.hasSubquery(f)), relation.output)
+val partitionColumnSet = AttributeSet(relation.partitionCols)
+ExpressionSet(normalizedFilters.filter { f =>
+  !f.references.isEmpty && f.references.subsetOf(partitionColumnSet)
+})
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(
+  relation: HiveTableRelation,
+  partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = {
+if (conf.metastorePartitionPruning) {
+  session.sessionState.catalog.listPartitionsByFilter(
+relation.tableMeta.identifier, partitionFilters.toSeq)
+} else {
+  ExternalCatalogUtils.prunePartitionsByFilter(relation.tableMeta,
+
session.sessionState.catalog.listPartitions(relation.tableMeta.identifier),
+partitionFilters.toSeq, conf.sessionLocalTimeZone)
+}
+  }
+
+  /**
+   * Update the statistics of the table.
+   */
+  private def updateTableMeta(
+  tableMeta: CatalogTable,
+  prunedPartitions: Seq[CatalogTablePartition]): CatalogTable = {
+val sizeOfPartitions = prunedPartitions.map { partition =>
+  val rawDataSize = 
partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong)
+  val totalSize = 
partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
+  if (rawDataSize.isDefined && rawDataSize.get > 0) {
+rawDataSize.get
+  } else if (totalSize.isDefined && totalSize.get > 0L) {
+totalSize.get
+  } else {
+0L
+  }
+}
+if (sizeOfPartitions.forall(s => s>0)) {
+  val sizeInBytes = sizeOfPartitions.sum
+  tableMeta.copy(stats = Some(CatalogStatistics(sizeInBytes = 
BigInt(sizeInBytes
+} else {
+  tableMeta
+}
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+case op @ PhysicalOperation(projections, filters, relation: 
HiveTableRelation)
+  if filters.nonEmpty && relation.isPartitioned && 
relation.prunedPartitions.isEmpty =>
+  val partitionKeyFilters = getPartitionKeyFilters(filters, relation)
+  if (partitionKeyFilters.nonEmpty) {
+val newPartitions = prunePartitions(relation, partitionKeyFilters)
+val newTableMeta = updateTableMeta(relation.tableMeta, newPartitions)
+val newRelation = relation.copy(
+  tableMeta = newTableMeta, prunedPartitions = Some(newPartitions))
+

[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-17 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r367810100
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, ExternalCatalogUtils, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, 
Expression, ExpressionSet, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf: SQLConf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def getPartitionKeyFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): ExpressionSet = {
+val normalizedFilters = DataSourceStrategy.normalizeExprs(
+  filters.filter(f => f.deterministic && 
!SubqueryExpression.hasSubquery(f)), relation.output)
+val partitionColumnSet = AttributeSet(relation.partitionCols)
+ExpressionSet(normalizedFilters.filter { f =>
+  !f.references.isEmpty && f.references.subsetOf(partitionColumnSet)
+})
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(
+  relation: HiveTableRelation,
+  partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = {
+if (conf.metastorePartitionPruning) {
+  session.sessionState.catalog.listPartitionsByFilter(
+relation.tableMeta.identifier, partitionFilters.toSeq)
+} else {
+  ExternalCatalogUtils.prunePartitionsByFilter(relation.tableMeta,
 
 Review comment:
   you mean totally remove this if clause and always call   
"session.sessionState.catalog.listPartitionsByFilter" ?
   it means always pushing down to hive metadata for pruning and also make sure 
returned partitions are exactly what we want. is that expected ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-17 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r367810100
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, ExternalCatalogUtils, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, 
Expression, ExpressionSet, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf: SQLConf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def getPartitionKeyFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): ExpressionSet = {
+val normalizedFilters = DataSourceStrategy.normalizeExprs(
+  filters.filter(f => f.deterministic && 
!SubqueryExpression.hasSubquery(f)), relation.output)
+val partitionColumnSet = AttributeSet(relation.partitionCols)
+ExpressionSet(normalizedFilters.filter { f =>
+  !f.references.isEmpty && f.references.subsetOf(partitionColumnSet)
+})
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(
+  relation: HiveTableRelation,
+  partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = {
+if (conf.metastorePartitionPruning) {
+  session.sessionState.catalog.listPartitionsByFilter(
+relation.tableMeta.identifier, partitionFilters.toSeq)
+} else {
+  ExternalCatalogUtils.prunePartitionsByFilter(relation.tableMeta,
 
 Review comment:
   you mean totally remove this if clause and always call   
"session.sessionState.catalog.listPartitionsByFilter" ?
   it means already pushing down to hive metadata for pruning and also make 
sure returned partitions are exactly what we want. is that expected ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-17 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r367810100
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, ExternalCatalogUtils, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, 
Expression, ExpressionSet, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf: SQLConf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def getPartitionKeyFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): ExpressionSet = {
+val normalizedFilters = DataSourceStrategy.normalizeExprs(
+  filters.filter(f => f.deterministic && 
!SubqueryExpression.hasSubquery(f)), relation.output)
+val partitionColumnSet = AttributeSet(relation.partitionCols)
+ExpressionSet(normalizedFilters.filter { f =>
+  !f.references.isEmpty && f.references.subsetOf(partitionColumnSet)
+})
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(
+  relation: HiveTableRelation,
+  partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = {
+if (conf.metastorePartitionPruning) {
+  session.sessionState.catalog.listPartitionsByFilter(
+relation.tableMeta.identifier, partitionFilters.toSeq)
+} else {
+  ExternalCatalogUtils.prunePartitionsByFilter(relation.tableMeta,
 
 Review comment:
   you mean totally remove this if statement and always call   
"session.sessionState.catalog.listPartitionsByFilter" ?
   it means already pushing down to hive metadata for pruning and also make 
sure returned partitions are exactly what we want. is that expected ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-17 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r367810100
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, ExternalCatalogUtils, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, 
Expression, ExpressionSet, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf: SQLConf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def getPartitionKeyFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): ExpressionSet = {
+val normalizedFilters = DataSourceStrategy.normalizeExprs(
+  filters.filter(f => f.deterministic && 
!SubqueryExpression.hasSubquery(f)), relation.output)
+val partitionColumnSet = AttributeSet(relation.partitionCols)
+ExpressionSet(normalizedFilters.filter { f =>
+  !f.references.isEmpty && f.references.subsetOf(partitionColumnSet)
+})
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(
+  relation: HiveTableRelation,
+  partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = {
+if (conf.metastorePartitionPruning) {
+  session.sessionState.catalog.listPartitionsByFilter(
+relation.tableMeta.identifier, partitionFilters.toSeq)
+} else {
+  ExternalCatalogUtils.prunePartitionsByFilter(relation.tableMeta,
 
 Review comment:
   you mean totally remove this if clause and always call   
"session.sessionState.catalog.listPartitionsByFilter" ?
   it means already pushing down to hive metadata for pruning and also make 
sure returned partitions are exactly what we want. is that expected ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-16 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r367314517
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.IOException
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, 
Expression, ExpressionSet, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def getPartitionKeyFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): ExpressionSet = {
+val normalizedFilters = DataSourceStrategy.normalizeExprs(
+  filters.filter(f => f.deterministic && 
!SubqueryExpression.hasSubquery(f)), relation.output)
+val partitionColumnSet = AttributeSet(relation.partitionCols)
+ExpressionSet(normalizedFilters.filter { f =>
+  !f.references.isEmpty && f.references.subsetOf(partitionColumnSet)
+})
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(
+  relation: HiveTableRelation,
+  partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = {
+if (conf.metastorePartitionPruning) {
 
 Review comment:
   yes, through "spark.sql.optimizer.excludedRules".
   Already updated back.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-15 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r367247006
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.IOException
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, 
Expression, ExpressionSet, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def getPartitionKeyFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): ExpressionSet = {
+val normalizedFilters = DataSourceStrategy.normalizeExprs(
+  filters.filter(f => f.deterministic && 
!SubqueryExpression.hasSubquery(f)), relation.output)
+val partitionColumnSet = AttributeSet(relation.partitionCols)
+ExpressionSet(normalizedFilters.filter { f =>
+  !f.references.isEmpty && f.references.subsetOf(partitionColumnSet)
+})
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(
+  relation: HiveTableRelation,
+  partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = {
+if (conf.metastorePartitionPruning) {
 
 Review comment:
   you mean adding a config to control whether we should prune table partitions 
in optimization phase ?
   And we can check the config in apply , this config can default to be true.
   And this config can also be checked in PruneFileSourcePartitions.apply.
   Is that expected ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-15 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r367247006
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.IOException
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, 
Expression, ExpressionSet, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def getPartitionKeyFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): ExpressionSet = {
+val normalizedFilters = DataSourceStrategy.normalizeExprs(
+  filters.filter(f => f.deterministic && 
!SubqueryExpression.hasSubquery(f)), relation.output)
+val partitionColumnSet = AttributeSet(relation.partitionCols)
+ExpressionSet(normalizedFilters.filter { f =>
+  !f.references.isEmpty && f.references.subsetOf(partitionColumnSet)
+})
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(
+  relation: HiveTableRelation,
+  partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = {
+if (conf.metastorePartitionPruning) {
 
 Review comment:
   you mean adding a config to control whether we should prune hive table 
partitions in optimization phase ?
   And we can check the config in apply , this config can default to be true.
   And this config can also be checked in PruneFileSourcePartitions.apply.
   Is that expected ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-15 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r367247006
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.IOException
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, 
Expression, ExpressionSet, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def getPartitionKeyFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): ExpressionSet = {
+val normalizedFilters = DataSourceStrategy.normalizeExprs(
+  filters.filter(f => f.deterministic && 
!SubqueryExpression.hasSubquery(f)), relation.output)
+val partitionColumnSet = AttributeSet(relation.partitionCols)
+ExpressionSet(normalizedFilters.filter { f =>
+  !f.references.isEmpty && f.references.subsetOf(partitionColumnSet)
+})
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(
+  relation: HiveTableRelation,
+  partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = {
+if (conf.metastorePartitionPruning) {
 
 Review comment:
   you mean adding a config to control whether we should prune hive table 
partitions in optimization phase ?
   And we can check the config in apply , this config can default to be true.
   Is that expected ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-15 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r367247006
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.IOException
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, 
Expression, ExpressionSet, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def getPartitionKeyFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): ExpressionSet = {
+val normalizedFilters = DataSourceStrategy.normalizeExprs(
+  filters.filter(f => f.deterministic && 
!SubqueryExpression.hasSubquery(f)), relation.output)
+val partitionColumnSet = AttributeSet(relation.partitionCols)
+ExpressionSet(normalizedFilters.filter { f =>
+  !f.references.isEmpty && f.references.subsetOf(partitionColumnSet)
+})
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(
+  relation: HiveTableRelation,
+  partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = {
+if (conf.metastorePartitionPruning) {
 
 Review comment:
   you mean adding a config to control whether we should prune hive table 
partitions ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-15 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r366763893
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.IOException
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, 
Expression, ExpressionSet, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def getPartitionKeyFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): ExpressionSet = {
+val normalizedFilters = DataSourceStrategy.normalizeExprs(
+  filters.filter(f => f.deterministic && 
!SubqueryExpression.hasSubquery(f)), relation.output)
+val partitionColumnSet = AttributeSet(relation.partitionCols)
+ExpressionSet(normalizedFilters.filter { f =>
+  !f.references.isEmpty && f.references.subsetOf(partitionColumnSet)
+})
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(
+  relation: HiveTableRelation,
+  partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = {
+if (conf.metastorePartitionPruning) {
+  session.sessionState.catalog.listPartitionsByFilter(
+relation.tableMeta.identifier, partitionFilters.toSeq)
+} else {
+  
session.sessionState.catalog.listPartitions(relation.tableMeta.identifier)
+}
+  }
+
+  /**
+   * Update the statistics of the table.
+   */
+  private def updateTableMeta(
+  tableMeta: CatalogTable,
+  prunedPartitions: Seq[CatalogTablePartition]): CatalogTable = {
+val sizeOfPartitions = try {
+  prunedPartitions.map { partition =>
+val rawDataSize = 
partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong)
+val totalSize = 
partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
+if (rawDataSize.isDefined && rawDataSize.get > 0) {
+  rawDataSize.get
+} else if (totalSize.isDefined && totalSize.get > 0L) {
+  totalSize.get
+} else {
+  0L
 
 Review comment:
   make sense, will udpate.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-15 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r366763185
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.IOException
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, 
Expression, ExpressionSet, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def getPartitionKeyFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): ExpressionSet = {
+val normalizedFilters = DataSourceStrategy.normalizeExprs(
+  filters.filter(f => f.deterministic && 
!SubqueryExpression.hasSubquery(f)), relation.output)
+val partitionColumnSet = AttributeSet(relation.partitionCols)
+ExpressionSet(normalizedFilters.filter { f =>
+  !f.references.isEmpty && f.references.subsetOf(partitionColumnSet)
+})
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(
+  relation: HiveTableRelation,
+  partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = {
+if (conf.metastorePartitionPruning) {
+  session.sessionState.catalog.listPartitionsByFilter(
+relation.tableMeta.identifier, partitionFilters.toSeq)
+} else {
+  
session.sessionState.catalog.listPartitions(relation.tableMeta.identifier)
+}
+  }
+
+  /**
+   * Update the statistics of the table.
+   */
+  private def updateTableMeta(
+  tableMeta: CatalogTable,
+  prunedPartitions: Seq[CatalogTablePartition]): CatalogTable = {
+val sizeOfPartitions = try {
+  prunedPartitions.map { partition =>
+val rawDataSize = 
partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong)
+val totalSize = 
partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
+if (rawDataSize.isDefined && rawDataSize.get > 0) {
+  rawDataSize.get
+} else if (totalSize.isDefined && totalSize.get > 0L) {
+  totalSize.get
+} else {
+  0L
+}
+  }.sum
+} catch {
+  case e: IOException =>
 
 Review comment:
   yea, it will not now. it may throw if we get size from file system.
   will update it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-15 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r366761579
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.IOException
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, 
Expression, ExpressionSet, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def getPartitionKeyFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): ExpressionSet = {
+val normalizedFilters = DataSourceStrategy.normalizeExprs(
+  filters.filter(f => f.deterministic && 
!SubqueryExpression.hasSubquery(f)), relation.output)
+val partitionColumnSet = AttributeSet(relation.partitionCols)
+ExpressionSet(normalizedFilters.filter { f =>
+  !f.references.isEmpty && f.references.subsetOf(partitionColumnSet)
+})
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(
+  relation: HiveTableRelation,
+  partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = {
+if (conf.metastorePartitionPruning) {
 
 Review comment:
   what about also pruning it in PruneHiveTablePartitions.prunePartitions even 
metastorePartitionPruning is not enabled.
   Because metastorePartitionPruning should control whether pushing down to 
hive metastore for early pruning instead of whether do pruning.
   Pruning should be done no matter metastorePartitionPruning is enabled or not.
   WDYT?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-14 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r366657181
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.IOException
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet, BindReferences, Expression, Literal, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.CommandUtils
+import org.apache.spark.sql.hive.client.HiveClientImpl
+import org.apache.spark.sql.types.BooleanType
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def extractPartitionPruningFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): Seq[Expression] = {
+val normalizedFilters = filters.map { e =>
 
 Review comment:
   sorry for missing this, updated.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-14 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r366369564
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.IOException
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet, BindReferences, Expression, Literal, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.CommandUtils
+import org.apache.spark.sql.hive.client.HiveClientImpl
+import org.apache.spark.sql.types.BooleanType
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def extractPartitionPruningFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): Seq[Expression] = {
+val normalizedFilters = filters.map { e =>
+  e transform {
+case a: AttributeReference =>
+  a.withName(relation.output.find(_.semanticEquals(a)).get.name)
+  }
+}
+val partitionSet = AttributeSet(relation.partitionCols)
+normalizedFilters.filter { predicate =>
+  !predicate.references.isEmpty && 
predicate.references.subsetOf(partitionSet)
+}
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(relation: HiveTableRelation, partitionFilters: 
Seq[Expression])
+  : Seq[CatalogTablePartition] = {
+val partitions =
+if (conf.metastorePartitionPruning) {
+  session.sessionState.catalog.listPartitionsByFilter(
+relation.tableMeta.identifier, partitionFilters)
+} else {
+  
session.sessionState.catalog.listPartitions(relation.tableMeta.identifier)
+}
+val shouldKeep = partitionFilters.reduceLeftOption(And).map { filter =>
 
 Review comment:
   yea, updated.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-14 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r366232583
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.IOException
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet, BindReferences, Expression, Literal, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.CommandUtils
+import org.apache.spark.sql.hive.client.HiveClientImpl
+import org.apache.spark.sql.types.BooleanType
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def extractPartitionPruningFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): Seq[Expression] = {
+val normalizedFilters = filters.map { e =>
+  e transform {
+case a: AttributeReference =>
+  a.withName(relation.output.find(_.semanticEquals(a)).get.name)
+  }
+}
+val partitionSet = AttributeSet(relation.partitionCols)
+normalizedFilters.filter { predicate =>
+  !predicate.references.isEmpty && 
predicate.references.subsetOf(partitionSet)
+}
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(relation: HiveTableRelation, partitionFilters: 
Seq[Expression])
+  : Seq[CatalogTablePartition] = {
+val partitions =
+if (conf.metastorePartitionPruning) {
+  session.sessionState.catalog.listPartitionsByFilter(
+relation.tableMeta.identifier, partitionFilters)
+} else {
+  
session.sessionState.catalog.listPartitions(relation.tableMeta.identifier)
+}
+val shouldKeep = partitionFilters.reduceLeftOption(And).map { filter =>
 
 Review comment:
   yes, it is. Just checked.
   Then the HiveTableScanExec should also be updated. right?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-14 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r366221330
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.IOException
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet, BindReferences, Expression, Literal, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.CommandUtils
+import org.apache.spark.sql.hive.client.HiveClientImpl
+import org.apache.spark.sql.types.BooleanType
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def extractPartitionPruningFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): Seq[Expression] = {
+val normalizedFilters = filters.map { e =>
+  e transform {
+case a: AttributeReference =>
+  a.withName(relation.output.find(_.semanticEquals(a)).get.name)
+  }
+}
+val partitionSet = AttributeSet(relation.partitionCols)
+normalizedFilters.filter { predicate =>
+  !predicate.references.isEmpty && 
predicate.references.subsetOf(partitionSet)
+}
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(relation: HiveTableRelation, partitionFilters: 
Seq[Expression])
+  : Seq[CatalogTablePartition] = {
+val partitions =
+if (conf.metastorePartitionPruning) {
+  session.sessionState.catalog.listPartitionsByFilter(
+relation.tableMeta.identifier, partitionFilters)
+} else {
+  
session.sessionState.catalog.listPartitions(relation.tableMeta.identifier)
+}
+val shouldKeep = partitionFilters.reduceLeftOption(And).map { filter =>
 
 Review comment:
   yea, make sense. What about creating a new PR for it and update it in both 
PruneHiveTablePartitions and HiveTableScanExec after it finished ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-13 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r366133849
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.IOException
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet, BindReferences, Expression, Literal, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.CommandUtils
+import org.apache.spark.sql.hive.client.HiveClientImpl
+import org.apache.spark.sql.types.BooleanType
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def extractPartitionPruningFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): Seq[Expression] = {
+val normalizedFilters = filters.map { e =>
+  e transform {
+case a: AttributeReference =>
+  a.withName(relation.output.find(_.semanticEquals(a)).get.name)
+  }
+}
+val partitionSet = AttributeSet(relation.partitionCols)
+normalizedFilters.filter { predicate =>
+  !predicate.references.isEmpty && 
predicate.references.subsetOf(partitionSet)
+}
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(relation: HiveTableRelation, partitionFilters: 
Seq[Expression])
+  : Seq[CatalogTablePartition] = {
+val partitions =
+if (conf.metastorePartitionPruning) {
+  session.sessionState.catalog.listPartitionsByFilter(
+relation.tableMeta.identifier, partitionFilters)
+} else {
+  
session.sessionState.catalog.listPartitions(relation.tableMeta.identifier)
+}
+val shouldKeep = partitionFilters.reduceLeftOption(And).map { filter =>
+  require(filter.dataType == BooleanType,
+s"Data type of predicate $filter must be ${BooleanType.catalogString} 
rather than " +
+  s"${filter.dataType.catalogString}.")
+  BindReferences.bindReference(filter, relation.partitionCols)
+}
+if (shouldKeep.nonEmpty) {
+  partitions.filter{ partition =>
+val hivePartition =
+  HiveClientImpl.toHivePartition(partition, 
HiveClientImpl.toHiveTable(relation.tableMeta))
+val dataTypes = relation.partitionCols.map(_.dataType)
+val castedValues = hivePartition.getValues.asScala.zip(dataTypes)
+  .map { case (value, dataType) => cast(Literal(value), 
dataType).eval(null) }
+val row = InternalRow.fromSeq(castedValues)
+shouldKeep.get.eval(row).asInstanceOf[Boolean]
+  }
+} else {
+  partitions
+}
+  }
+
+  /**
+   * Update the statistics of the table.
+   */
+  private def updateTableMeta(
+  tableMeta: CatalogTable,
+  prunedPartitions: Seq[CatalogTablePartition]): CatalogTable = {
+val sizeInBytes = try {
+  prunedPartitions.map { partition =>
+val rawDataSize = 
partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong)
+val totalSize = 
partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
+if (rawDataSize.isDefined && 

[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-13 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r366135547
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.IOException
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet, BindReferences, Expression, Literal, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.CommandUtils
+import org.apache.spark.sql.hive.client.HiveClientImpl
+import org.apache.spark.sql.types.BooleanType
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def extractPartitionPruningFilters(
+  filters: Seq[Expression],
 
 Review comment:
   sure, thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-13 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r366135537
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.IOException
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet, BindReferences, Expression, Literal, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.CommandUtils
+import org.apache.spark.sql.hive.client.HiveClientImpl
+import org.apache.spark.sql.types.BooleanType
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def extractPartitionPruningFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): Seq[Expression] = {
+val normalizedFilters = filters.map { e =>
 
 Review comment:
   sure, thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-13 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r366135519
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.IOException
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet, BindReferences, Expression, Literal, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.CommandUtils
+import org.apache.spark.sql.hive.client.HiveClientImpl
+import org.apache.spark.sql.types.BooleanType
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def extractPartitionPruningFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): Seq[Expression] = {
+val normalizedFilters = filters.map { e =>
+  e transform {
+case a: AttributeReference =>
+  a.withName(relation.output.find(_.semanticEquals(a)).get.name)
+  }
+}
+val partitionSet = AttributeSet(relation.partitionCols)
+normalizedFilters.filter { predicate =>
+  !predicate.references.isEmpty && 
predicate.references.subsetOf(partitionSet)
+}
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(relation: HiveTableRelation, partitionFilters: 
Seq[Expression])
 
 Review comment:
   sure, thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-13 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r366135486
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.IOException
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet, BindReferences, Expression, Literal, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.CommandUtils
+import org.apache.spark.sql.hive.client.HiveClientImpl
+import org.apache.spark.sql.types.BooleanType
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def extractPartitionPruningFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): Seq[Expression] = {
+val normalizedFilters = filters.map { e =>
+  e transform {
+case a: AttributeReference =>
+  a.withName(relation.output.find(_.semanticEquals(a)).get.name)
+  }
+}
+val partitionSet = AttributeSet(relation.partitionCols)
+normalizedFilters.filter { predicate =>
+  !predicate.references.isEmpty && 
predicate.references.subsetOf(partitionSet)
+}
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(relation: HiveTableRelation, partitionFilters: 
Seq[Expression])
+  : Seq[CatalogTablePartition] = {
+val partitions =
+if (conf.metastorePartitionPruning) {
+  session.sessionState.catalog.listPartitionsByFilter(
+relation.tableMeta.identifier, partitionFilters)
+} else {
+  
session.sessionState.catalog.listPartitions(relation.tableMeta.identifier)
+}
+val shouldKeep = partitionFilters.reduceLeftOption(And).map { filter =>
 
 Review comment:
   Like in HiveTableScanExec, it calls prunePartitions(rawPartitions) in 
doExecute, while the rawPartitions already call 
ExternalCatalog.listPartitionsByFilter itself if 
HIVE_METASTORE_PARTITION_PRUNING enabled. 
   
   It push the partition filters down into hive metastore for earlier pruning, 
but it still prune it again in prunePartitions, because some predicates may not 
be supported in hive metastore, like "b like 'xyz'".
   
   So i think it is necessary here, WDYT?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-13 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r366133849
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.IOException
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet, BindReferences, Expression, Literal, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.CommandUtils
+import org.apache.spark.sql.hive.client.HiveClientImpl
+import org.apache.spark.sql.types.BooleanType
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def extractPartitionPruningFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): Seq[Expression] = {
+val normalizedFilters = filters.map { e =>
+  e transform {
+case a: AttributeReference =>
+  a.withName(relation.output.find(_.semanticEquals(a)).get.name)
+  }
+}
+val partitionSet = AttributeSet(relation.partitionCols)
+normalizedFilters.filter { predicate =>
+  !predicate.references.isEmpty && 
predicate.references.subsetOf(partitionSet)
+}
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(relation: HiveTableRelation, partitionFilters: 
Seq[Expression])
+  : Seq[CatalogTablePartition] = {
+val partitions =
+if (conf.metastorePartitionPruning) {
+  session.sessionState.catalog.listPartitionsByFilter(
+relation.tableMeta.identifier, partitionFilters)
+} else {
+  
session.sessionState.catalog.listPartitions(relation.tableMeta.identifier)
+}
+val shouldKeep = partitionFilters.reduceLeftOption(And).map { filter =>
+  require(filter.dataType == BooleanType,
+s"Data type of predicate $filter must be ${BooleanType.catalogString} 
rather than " +
+  s"${filter.dataType.catalogString}.")
+  BindReferences.bindReference(filter, relation.partitionCols)
+}
+if (shouldKeep.nonEmpty) {
+  partitions.filter{ partition =>
+val hivePartition =
+  HiveClientImpl.toHivePartition(partition, 
HiveClientImpl.toHiveTable(relation.tableMeta))
+val dataTypes = relation.partitionCols.map(_.dataType)
+val castedValues = hivePartition.getValues.asScala.zip(dataTypes)
+  .map { case (value, dataType) => cast(Literal(value), 
dataType).eval(null) }
+val row = InternalRow.fromSeq(castedValues)
+shouldKeep.get.eval(row).asInstanceOf[Boolean]
+  }
+} else {
+  partitions
+}
+  }
+
+  /**
+   * Update the statistics of the table.
+   */
+  private def updateTableMeta(
+  tableMeta: CatalogTable,
+  prunedPartitions: Seq[CatalogTablePartition]): CatalogTable = {
+val sizeInBytes = try {
+  prunedPartitions.map { partition =>
+val rawDataSize = 
partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong)
+val totalSize = 
partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
+if (rawDataSize.isDefined && 

[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-08 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r364587799
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.IOException
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, 
HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet, Expression, PredicateHelper, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.CommandUtils
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with PredicateHelper {
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def extractPartitionPruningFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): Seq[Expression] = {
+val normalizedFilters = filters.map { e =>
+  e transform {
+case a: AttributeReference =>
+  a.withName(relation.output.find(_.semanticEquals(a)).get.name)
+  }
+}
+val partitionSet = AttributeSet(relation.partitionCols)
+normalizedFilters.filter { predicate =>
+  !predicate.references.isEmpty && 
predicate.references.subsetOf(partitionSet)
+}
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table,
+   * and also update the statistics of the table.
+   */
+  private def prunedHiveTableWithStats(
+  relation: HiveTableRelation,
+  partitionFilters: Seq[Expression]): HiveTableRelation = {
+val conf = session.sessionState.conf
+val prunedPartitions = session.sessionState.catalog.listPartitionsByFilter(
+  relation.tableMeta.identifier,
+  partitionFilters)
+val sizeInBytes = try {
+  val sizeOfPartitions = prunedPartitions.map { partition =>
+val rawDataSize = 
partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong)
+val totalSize = 
partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
+if (rawDataSize.isDefined && rawDataSize.get > 0) {
+  rawDataSize.get
+} else if (totalSize.isDefined && totalSize.get > 0L) {
+  totalSize.get
+} else {
+  CommandUtils.calculateLocationSize(
+session.sessionState, relation.tableMeta.identifier, 
partition.storage.locationUri)
+}
+  }.sum
+  // If size of partitions is zero fall back to the default size.
+  if (sizeOfPartitions == 0L) conf.defaultSizeInBytes else sizeOfPartitions
+} catch {
+  case e: IOException =>
+logWarning("Failed to get table size from HDFS.", e)
+conf.defaultSizeInBytes
+}
+val newTableMeta =
+  if (relation.tableMeta.stats.isDefined) {
+relation.tableMeta.copy(
+  stats = Some(relation.tableMeta.stats.get.copy(sizeInBytes = 
BigInt(sizeInBytes
 
 Review comment:
   could be inconsistent, eg. rowCount and sizeInBytes may be inconsistent 
after this rule.
   Restored to creating new CatalogStatistics instance. But by doing so, some 
statistics may be lost which should not impact accuracy.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To 

[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-08 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r364587799
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.IOException
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, 
HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet, Expression, PredicateHelper, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.CommandUtils
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with PredicateHelper {
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def extractPartitionPruningFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): Seq[Expression] = {
+val normalizedFilters = filters.map { e =>
+  e transform {
+case a: AttributeReference =>
+  a.withName(relation.output.find(_.semanticEquals(a)).get.name)
+  }
+}
+val partitionSet = AttributeSet(relation.partitionCols)
+normalizedFilters.filter { predicate =>
+  !predicate.references.isEmpty && 
predicate.references.subsetOf(partitionSet)
+}
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table,
+   * and also update the statistics of the table.
+   */
+  private def prunedHiveTableWithStats(
+  relation: HiveTableRelation,
+  partitionFilters: Seq[Expression]): HiveTableRelation = {
+val conf = session.sessionState.conf
+val prunedPartitions = session.sessionState.catalog.listPartitionsByFilter(
+  relation.tableMeta.identifier,
+  partitionFilters)
+val sizeInBytes = try {
+  val sizeOfPartitions = prunedPartitions.map { partition =>
+val rawDataSize = 
partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong)
+val totalSize = 
partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
+if (rawDataSize.isDefined && rawDataSize.get > 0) {
+  rawDataSize.get
+} else if (totalSize.isDefined && totalSize.get > 0L) {
+  totalSize.get
+} else {
+  CommandUtils.calculateLocationSize(
+session.sessionState, relation.tableMeta.identifier, 
partition.storage.locationUri)
+}
+  }.sum
+  // If size of partitions is zero fall back to the default size.
+  if (sizeOfPartitions == 0L) conf.defaultSizeInBytes else sizeOfPartitions
+} catch {
+  case e: IOException =>
+logWarning("Failed to get table size from HDFS.", e)
+conf.defaultSizeInBytes
+}
+val newTableMeta =
+  if (relation.tableMeta.stats.isDefined) {
+relation.tableMeta.copy(
+  stats = Some(relation.tableMeta.stats.get.copy(sizeInBytes = 
BigInt(sizeInBytes
 
 Review comment:
   could be inconsistent, eg. rowCount and sizeInBytes may be inconsistent 
after this rule.
   So restored to creating new CatalogStatistics instance. But by doing so, 
some statistics may be lost which should not impact accuracy.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-

[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-08 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r364587799
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.IOException
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, 
HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet, Expression, PredicateHelper, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.CommandUtils
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with PredicateHelper {
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def extractPartitionPruningFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): Seq[Expression] = {
+val normalizedFilters = filters.map { e =>
+  e transform {
+case a: AttributeReference =>
+  a.withName(relation.output.find(_.semanticEquals(a)).get.name)
+  }
+}
+val partitionSet = AttributeSet(relation.partitionCols)
+normalizedFilters.filter { predicate =>
+  !predicate.references.isEmpty && 
predicate.references.subsetOf(partitionSet)
+}
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table,
+   * and also update the statistics of the table.
+   */
+  private def prunedHiveTableWithStats(
+  relation: HiveTableRelation,
+  partitionFilters: Seq[Expression]): HiveTableRelation = {
+val conf = session.sessionState.conf
+val prunedPartitions = session.sessionState.catalog.listPartitionsByFilter(
+  relation.tableMeta.identifier,
+  partitionFilters)
+val sizeInBytes = try {
+  val sizeOfPartitions = prunedPartitions.map { partition =>
+val rawDataSize = 
partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong)
+val totalSize = 
partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
+if (rawDataSize.isDefined && rawDataSize.get > 0) {
+  rawDataSize.get
+} else if (totalSize.isDefined && totalSize.get > 0L) {
+  totalSize.get
+} else {
+  CommandUtils.calculateLocationSize(
+session.sessionState, relation.tableMeta.identifier, 
partition.storage.locationUri)
+}
+  }.sum
+  // If size of partitions is zero fall back to the default size.
+  if (sizeOfPartitions == 0L) conf.defaultSizeInBytes else sizeOfPartitions
+} catch {
+  case e: IOException =>
+logWarning("Failed to get table size from HDFS.", e)
+conf.defaultSizeInBytes
+}
+val newTableMeta =
+  if (relation.tableMeta.stats.isDefined) {
+relation.tableMeta.copy(
+  stats = Some(relation.tableMeta.stats.get.copy(sizeInBytes = 
BigInt(sizeInBytes
 
 Review comment:
   could be inconsistent, eg. rowCount and sizeInBytes is inconsistent.
   Restored to creating new CatalogStatistics instance. But by doing so, some 
statistics may be lost which should not impact accuracy.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: 

[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-05 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r363164009
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
 ##
 @@ -1375,6 +1375,16 @@ object SQLConf {
 .booleanConf
 .createWithDefault(false)
 
+  val FALL_BACK_TO_HDFS_FOR_STATS_MAX_PART_NUM =
+buildConf("spark.sql.statistics.fallBackToHdfs.maxPartitionNum")
+.doc("If the number of table partitions exceed this value, falling back to 
hdfs " +
+  "for statistics calculation is not allowed. This is used to avoid 
calculating " +
+  "the size of a large number of partitions through hdfs, which is very 
time consuming." +
+  "Setting this value to 0 or negative will disable falling back to hdfs 
for " +
+  "partition statistic calculation.")
 
 Review comment:
   yea, will create new PR for it. Already removed from this PR. thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-04 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r363016268
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
 ##
 @@ -1375,6 +1375,16 @@ object SQLConf {
 .booleanConf
 .createWithDefault(false)
 
+  val FALL_BACK_TO_HDFS_FOR_STATS_MAX_PART_NUM =
+buildConf("spark.sql.statistics.fallBackToHdfs.maxPartitionNum")
+.doc("If the number of table partitions exceed this value, falling back to 
hdfs " +
+  "for statistics calculation is not allowed. This is used to avoid 
calculating " +
+  "the size of a large number of partitions through hdfs, which is very 
time consuming." +
+  "Setting this value to 0 or negative will disable falling back to hdfs 
for " +
+  "partition statistic calculation.")
 
 Review comment:
   Yes, in PruneFileSourcePartitions, it also may lead to calculating size of 
large number of partitions through hdfs.
   I will create a follow-up PR to refine it after this PR finished.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-03 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r363016643
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
 ##
 @@ -150,6 +151,100 @@ class DetermineTableStats(session: SparkSession) extends 
Rule[LogicalPlan] {
   }
 }
 
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+case class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with PredicateHelper {
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def extractPartitionPruningFilters(filters: Seq[Expression],
+ relation: HiveTableRelation): 
Seq[Expression] = {
+val normalizedFilters = filters.map { e =>
+  e transform {
+case a: AttributeReference =>
+  a.withName(relation.output.find(_.semanticEquals(a)).get.name)
+  }
+}
+val partitionSet = AttributeSet(relation.partitionCols)
+normalizedFilters.filter { predicate =>
+  !predicate.references.isEmpty && 
predicate.references.subsetOf(partitionSet)
+}
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table,
+   * and also update the statistics of the table.
+   */
+  private def prunedHiveTableWithStats(relation: HiveTableRelation,
+   partitionFilters: Seq[Expression]): 
HiveTableRelation = {
+val conf = session.sessionState.conf
+val prunedPartitions = 
session.sharedState.externalCatalog.listPartitionsByFilter(
+  relation.tableMeta.database,
+  relation.tableMeta.identifier.table,
+  partitionFilters,
+  conf.sessionLocalTimeZone)
+val sizeInBytes = try {
+  val partitionsWithSize = prunedPartitions.map { part =>
+val rawDataSize = 
part.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong)
+val totalSize = 
part.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
+if (rawDataSize.isDefined && rawDataSize.get > 0) {
+  (part, rawDataSize.get)
+} else if (totalSize.isDefined && totalSize.get > 0L) {
+  (part, totalSize.get)
+} else {
+  (part, 0L)
+}
+  }
+  val sizeOfPartitions =
 
 Review comment:
   DetermineTableStats is Analyzer rule, while the pruned partitions and the 
size of them must be calculated 
   after filter push-down optimizers executed. So we can not put this part in 
DetermineTableStats now.
   But I will check whether the DetermineTableStats can be moved to 
optimization phase and put after PruneHiveTablePartitions. If any 
idea/suggestion, please share. thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-03 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r363016643
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
 ##
 @@ -150,6 +151,100 @@ class DetermineTableStats(session: SparkSession) extends 
Rule[LogicalPlan] {
   }
 }
 
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+case class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with PredicateHelper {
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def extractPartitionPruningFilters(filters: Seq[Expression],
+ relation: HiveTableRelation): 
Seq[Expression] = {
+val normalizedFilters = filters.map { e =>
+  e transform {
+case a: AttributeReference =>
+  a.withName(relation.output.find(_.semanticEquals(a)).get.name)
+  }
+}
+val partitionSet = AttributeSet(relation.partitionCols)
+normalizedFilters.filter { predicate =>
+  !predicate.references.isEmpty && 
predicate.references.subsetOf(partitionSet)
+}
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table,
+   * and also update the statistics of the table.
+   */
+  private def prunedHiveTableWithStats(relation: HiveTableRelation,
+   partitionFilters: Seq[Expression]): 
HiveTableRelation = {
+val conf = session.sessionState.conf
+val prunedPartitions = 
session.sharedState.externalCatalog.listPartitionsByFilter(
+  relation.tableMeta.database,
+  relation.tableMeta.identifier.table,
+  partitionFilters,
+  conf.sessionLocalTimeZone)
+val sizeInBytes = try {
+  val partitionsWithSize = prunedPartitions.map { part =>
+val rawDataSize = 
part.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong)
+val totalSize = 
part.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
+if (rawDataSize.isDefined && rawDataSize.get > 0) {
+  (part, rawDataSize.get)
+} else if (totalSize.isDefined && totalSize.get > 0L) {
+  (part, totalSize.get)
+} else {
+  (part, 0L)
+}
+  }
+  val sizeOfPartitions =
 
 Review comment:
   DetermineTableStats is Analyzer rule, while the pruned partitions and the 
size of them must be calculated 
   after filter push-down optimizers executed. So we can not put this part in 
DetermineTableStats now.
   But I will check whether the DetermineTableStats can be moved to Optimizer 
and put after PruneHiveTablePartitions. If any idea/suggestion, please share. 
thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-03 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r363016673
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
 ##
 @@ -150,6 +151,100 @@ class DetermineTableStats(session: SparkSession) extends 
Rule[LogicalPlan] {
   }
 }
 
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+case class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with PredicateHelper {
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def extractPartitionPruningFilters(filters: Seq[Expression],
+ relation: HiveTableRelation): 
Seq[Expression] = {
+val normalizedFilters = filters.map { e =>
+  e transform {
+case a: AttributeReference =>
+  a.withName(relation.output.find(_.semanticEquals(a)).get.name)
+  }
+}
+val partitionSet = AttributeSet(relation.partitionCols)
+normalizedFilters.filter { predicate =>
+  !predicate.references.isEmpty && 
predicate.references.subsetOf(partitionSet)
+}
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table,
+   * and also update the statistics of the table.
+   */
+  private def prunedHiveTableWithStats(relation: HiveTableRelation,
+   partitionFilters: Seq[Expression]): 
HiveTableRelation = {
+val conf = session.sessionState.conf
+val prunedPartitions = 
session.sharedState.externalCatalog.listPartitionsByFilter(
+  relation.tableMeta.database,
+  relation.tableMeta.identifier.table,
+  partitionFilters,
+  conf.sessionLocalTimeZone)
+val sizeInBytes = try {
+  val partitionsWithSize = prunedPartitions.map { part =>
+val rawDataSize = 
part.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong)
+val totalSize = 
part.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
+if (rawDataSize.isDefined && rawDataSize.get > 0) {
+  (part, rawDataSize.get)
+} else if (totalSize.isDefined && totalSize.get > 0L) {
+  (part, totalSize.get)
+} else {
+  (part, 0L)
+}
+  }
+  val sizeOfPartitions =
+if (partitionsWithSize.count(_._2==0) <= 
conf.fallBackToHdfsForStatsMaxPartitionNum) {
+  partitionsWithSize.map{ pair =>
+val (part, size) = (pair._1, pair._2)
+if (size == 0) {
+  CommandUtils.calculateLocationSize(
+session.sessionState, relation.tableMeta.identifier, 
part.storage.locationUri)
+} else {
+  size
+}
+  }.sum
+} else {
+  partitionsWithSize.filter(_._2>0).map(_._2).sum
+}
+  // If size of partitions is zero fall back to the default size.
+  if (sizeOfPartitions == 0L) conf.defaultSizeInBytes else sizeOfPartitions
+} catch {
+  case e: IOException =>
+logWarning("Failed to get table size from HDFS.", e)
+conf.defaultSizeInBytes
+}
+val withStats =
+  if (relation.tableMeta.stats.isDefined) {
+relation.tableMeta.copy(
+  stats = Some(relation.tableMeta.stats.get.copy(sizeInBytes = 
BigInt(sizeInBytes
+  } else {
+relation.tableMeta.copy(stats = Some(CatalogStatistics(sizeInBytes = 
BigInt(sizeInBytes
+  }
+relation.copy(tableMeta = withStats, prunedPartitions = 
Some(prunedPartitions))
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+  case op @ PhysicalOperation(projections, filters, relation: 
HiveTableRelation)
 
 Review comment:
   sure, thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-03 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r363016643
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
 ##
 @@ -150,6 +151,100 @@ class DetermineTableStats(session: SparkSession) extends 
Rule[LogicalPlan] {
   }
 }
 
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+case class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with PredicateHelper {
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def extractPartitionPruningFilters(filters: Seq[Expression],
+ relation: HiveTableRelation): 
Seq[Expression] = {
+val normalizedFilters = filters.map { e =>
+  e transform {
+case a: AttributeReference =>
+  a.withName(relation.output.find(_.semanticEquals(a)).get.name)
+  }
+}
+val partitionSet = AttributeSet(relation.partitionCols)
+normalizedFilters.filter { predicate =>
+  !predicate.references.isEmpty && 
predicate.references.subsetOf(partitionSet)
+}
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table,
+   * and also update the statistics of the table.
+   */
+  private def prunedHiveTableWithStats(relation: HiveTableRelation,
+   partitionFilters: Seq[Expression]): 
HiveTableRelation = {
+val conf = session.sessionState.conf
+val prunedPartitions = 
session.sharedState.externalCatalog.listPartitionsByFilter(
+  relation.tableMeta.database,
+  relation.tableMeta.identifier.table,
+  partitionFilters,
+  conf.sessionLocalTimeZone)
+val sizeInBytes = try {
+  val partitionsWithSize = prunedPartitions.map { part =>
+val rawDataSize = 
part.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong)
+val totalSize = 
part.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
+if (rawDataSize.isDefined && rawDataSize.get > 0) {
+  (part, rawDataSize.get)
+} else if (totalSize.isDefined && totalSize.get > 0L) {
+  (part, totalSize.get)
+} else {
+  (part, 0L)
+}
+  }
+  val sizeOfPartitions =
 
 Review comment:
   DetermineTableStats is Analyzer rule, and the pruned partitions and the size 
of them must be calculated after filter push-down optimizers executed. So we 
can not put this part in DetermineTableStats now.
   But I will check whether the DetermineTableStats can be moved to Optimizer 
and put after PruneHiveTablePartitions. If any idea/suggestion, please share. 
thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-03 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r363016643
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
 ##
 @@ -150,6 +151,100 @@ class DetermineTableStats(session: SparkSession) extends 
Rule[LogicalPlan] {
   }
 }
 
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+case class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with PredicateHelper {
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def extractPartitionPruningFilters(filters: Seq[Expression],
+ relation: HiveTableRelation): 
Seq[Expression] = {
+val normalizedFilters = filters.map { e =>
+  e transform {
+case a: AttributeReference =>
+  a.withName(relation.output.find(_.semanticEquals(a)).get.name)
+  }
+}
+val partitionSet = AttributeSet(relation.partitionCols)
+normalizedFilters.filter { predicate =>
+  !predicate.references.isEmpty && 
predicate.references.subsetOf(partitionSet)
+}
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table,
+   * and also update the statistics of the table.
+   */
+  private def prunedHiveTableWithStats(relation: HiveTableRelation,
+   partitionFilters: Seq[Expression]): 
HiveTableRelation = {
+val conf = session.sessionState.conf
+val prunedPartitions = 
session.sharedState.externalCatalog.listPartitionsByFilter(
+  relation.tableMeta.database,
+  relation.tableMeta.identifier.table,
+  partitionFilters,
+  conf.sessionLocalTimeZone)
+val sizeInBytes = try {
+  val partitionsWithSize = prunedPartitions.map { part =>
+val rawDataSize = 
part.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong)
+val totalSize = 
part.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
+if (rawDataSize.isDefined && rawDataSize.get > 0) {
+  (part, rawDataSize.get)
+} else if (totalSize.isDefined && totalSize.get > 0L) {
+  (part, totalSize.get)
+} else {
+  (part, 0L)
+}
+  }
+  val sizeOfPartitions =
 
 Review comment:
   DetermineTableStats is Analyzer rule, and the size of pruned partitions must 
be calculated after filter push-down optimizers executed. So we can not put 
this part in DetermineTableStats now.
   But I will check whether the DetermineTableStats can be moved to Optimizer 
and put after PruneHiveTablePartitions. If any idea/suggestion, please share. 
thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-03 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r363016444
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
 ##
 @@ -150,6 +151,100 @@ class DetermineTableStats(session: SparkSession) extends 
Rule[LogicalPlan] {
   }
 }
 
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+case class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with PredicateHelper {
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def extractPartitionPruningFilters(filters: Seq[Expression],
+ relation: HiveTableRelation): 
Seq[Expression] = {
+val normalizedFilters = filters.map { e =>
+  e transform {
+case a: AttributeReference =>
+  a.withName(relation.output.find(_.semanticEquals(a)).get.name)
+  }
+}
+val partitionSet = AttributeSet(relation.partitionCols)
+normalizedFilters.filter { predicate =>
+  !predicate.references.isEmpty && 
predicate.references.subsetOf(partitionSet)
+}
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table,
+   * and also update the statistics of the table.
+   */
+  private def prunedHiveTableWithStats(relation: HiveTableRelation,
+   partitionFilters: Seq[Expression]): 
HiveTableRelation = {
+val conf = session.sessionState.conf
+val prunedPartitions = 
session.sharedState.externalCatalog.listPartitionsByFilter(
 
 Review comment:
   yea, updated.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-03 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r363016430
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
 ##
 @@ -150,6 +151,100 @@ class DetermineTableStats(session: SparkSession) extends 
Rule[LogicalPlan] {
   }
 }
 
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+case class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with PredicateHelper {
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def extractPartitionPruningFilters(filters: Seq[Expression],
 
 Review comment:
   sure, thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-03 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r363016268
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
 ##
 @@ -1375,6 +1375,16 @@ object SQLConf {
 .booleanConf
 .createWithDefault(false)
 
+  val FALL_BACK_TO_HDFS_FOR_STATS_MAX_PART_NUM =
+buildConf("spark.sql.statistics.fallBackToHdfs.maxPartitionNum")
+.doc("If the number of table partitions exceed this value, falling back to 
hdfs " +
+  "for statistics calculation is not allowed. This is used to avoid 
calculating " +
+  "the size of a large number of partitions through hdfs, which is very 
time consuming." +
+  "Setting this value to 0 or negative will disable falling back to hdfs 
for " +
+  "partition statistic calculation.")
 
 Review comment:
   Yes, in PruneFileSourcePartitions, it also may lead to calculating size of 
large number of partitions through hdfs.
   I will create a follow-up PR to refine it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-03 Thread GitBox
fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r363013610
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
 ##
 @@ -150,6 +151,100 @@ class DetermineTableStats(session: SparkSession) extends 
Rule[LogicalPlan] {
   }
 }
 
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+case class PruneHiveTablePartitions(session: SparkSession)
 
 Review comment:
   sure, thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org