[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-02-09 Thread GitBox
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r376844512
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, ExternalCatalogUtils, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, 
Expression, ExpressionSet, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
 
 Review comment:
   classdoc is good enough


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-20 Thread GitBox
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r368849527
 
 

 ##
 File path: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala
 ##
 @@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
+
+class PruneHiveTablePartitionsSuite extends QueryTest with SQLTestUtils with 
TestHiveSingleton {
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+val batches =
+  Batch("PruneHiveTablePartitions", Once,
+EliminateSubqueryAliases, new PruneHiveTablePartitions(spark)) :: Nil
+  }
+
+  test("SPARK-15616 statistics pruned after going throuhg 
PruneHiveTablePartitions") {
+withTable("test", "temp") {
+  sql(
+s"""
+   |CREATE TABLE test(i int)
+   |PARTITIONED BY (p int)
+   |STORED AS textfile""".stripMargin)
+  spark.range(0, 1000, 1).selectExpr("id as col")
+.createOrReplaceTempView("temp")
+
+  for (part <- Seq(1, 2, 3, 4)) {
+sql(s"""
+   |INSERT OVERWRITE TABLE test PARTITION (p='$part')
+   |select col from temp""".stripMargin)
+  }
+  val analyzed1 = sql("select i from test where 
p>0").queryExecution.analyzed
 
 Review comment:
   nit: `p > 0`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-20 Thread GitBox
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r368849212
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, ExternalCatalogUtils, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, 
Expression, ExpressionSet, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf: SQLConf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def getPartitionKeyFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): ExpressionSet = {
+val normalizedFilters = DataSourceStrategy.normalizeExprs(
+  filters.filter(f => f.deterministic && 
!SubqueryExpression.hasSubquery(f)), relation.output)
+val partitionColumnSet = AttributeSet(relation.partitionCols)
+ExpressionSet(normalizedFilters.filter { f =>
+  !f.references.isEmpty && f.references.subsetOf(partitionColumnSet)
+})
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(
+  relation: HiveTableRelation,
+  partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = {
+if (conf.metastorePartitionPruning) {
+  session.sessionState.catalog.listPartitionsByFilter(
+relation.tableMeta.identifier, partitionFilters.toSeq)
+} else {
+  ExternalCatalogUtils.prunePartitionsByFilter(relation.tableMeta,
+
session.sessionState.catalog.listPartitions(relation.tableMeta.identifier),
+partitionFilters.toSeq, conf.sessionLocalTimeZone)
+}
+  }
+
+  /**
+   * Update the statistics of the table.
+   */
+  private def updateTableMeta(
+  tableMeta: CatalogTable,
+  prunedPartitions: Seq[CatalogTablePartition]): CatalogTable = {
+val sizeOfPartitions = prunedPartitions.map { partition =>
+  val rawDataSize = 
partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong)
+  val totalSize = 
partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
+  if (rawDataSize.isDefined && rawDataSize.get > 0) {
+rawDataSize.get
+  } else if (totalSize.isDefined && totalSize.get > 0L) {
+totalSize.get
+  } else {
+0L
+  }
+}
+if (sizeOfPartitions.forall(s => s>0)) {
 
 Review comment:
   nit `forall(_ > 0)`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-20 Thread GitBox
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r368849571
 
 

 ##
 File path: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala
 ##
 @@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
+
+class PruneHiveTablePartitionsSuite extends QueryTest with SQLTestUtils with 
TestHiveSingleton {
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+val batches =
+  Batch("PruneHiveTablePartitions", Once,
+EliminateSubqueryAliases, new PruneHiveTablePartitions(spark)) :: Nil
+  }
+
+  test("SPARK-15616 statistics pruned after going throuhg 
PruneHiveTablePartitions") {
+withTable("test", "temp") {
+  sql(
+s"""
+   |CREATE TABLE test(i int)
+   |PARTITIONED BY (p int)
+   |STORED AS textfile""".stripMargin)
+  spark.range(0, 1000, 1).selectExpr("id as col")
+.createOrReplaceTempView("temp")
+
+  for (part <- Seq(1, 2, 3, 4)) {
+sql(s"""
+   |INSERT OVERWRITE TABLE test PARTITION (p='$part')
+   |select col from temp""".stripMargin)
+  }
+  val analyzed1 = sql("select i from test where 
p>0").queryExecution.analyzed
+  val analyzed2 = sql("select i from test where 
p=1").queryExecution.analyzed
 
 Review comment:
   ditto


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-20 Thread GitBox
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r368849466
 
 

 ##
 File path: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala
 ##
 @@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
+
+class PruneHiveTablePartitionsSuite extends QueryTest with SQLTestUtils with 
TestHiveSingleton {
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+val batches =
+  Batch("PruneHiveTablePartitions", Once,
+EliminateSubqueryAliases, new PruneHiveTablePartitions(spark)) :: Nil
+  }
+
+  test("SPARK-15616 statistics pruned after going throuhg 
PruneHiveTablePartitions") {
+withTable("test", "temp") {
+  sql(
+s"""
+   |CREATE TABLE test(i int)
+   |PARTITIONED BY (p int)
+   |STORED AS textfile""".stripMargin)
+  spark.range(0, 1000, 1).selectExpr("id as col")
+.createOrReplaceTempView("temp")
+
+  for (part <- Seq(1, 2, 3, 4)) {
+sql(s"""
+   |INSERT OVERWRITE TABLE test PARTITION (p='$part')
 
 Review comment:
   please keep the style of multiline string consistent. 
https://github.com/apache/spark/pull/26805/files#diff-90836f7778a704901d7d3df02846aa55R39
 is corrected.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-20 Thread GitBox
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r368849637
 
 

 ##
 File path: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala
 ##
 @@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
+
+class PruneHiveTablePartitionsSuite extends QueryTest with SQLTestUtils with 
TestHiveSingleton {
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+val batches =
+  Batch("PruneHiveTablePartitions", Once,
+EliminateSubqueryAliases, new PruneHiveTablePartitions(spark)) :: Nil
+  }
+
+  test("SPARK-15616 statistics pruned after going throuhg 
PruneHiveTablePartitions") {
+withTable("test", "temp") {
+  sql(
+s"""
+   |CREATE TABLE test(i int)
+   |PARTITIONED BY (p int)
+   |STORED AS textfile""".stripMargin)
+  spark.range(0, 1000, 1).selectExpr("id as col")
+.createOrReplaceTempView("temp")
+
+  for (part <- Seq(1, 2, 3, 4)) {
+sql(s"""
+   |INSERT OVERWRITE TABLE test PARTITION (p='$part')
+   |select col from temp""".stripMargin)
+  }
+  val analyzed1 = sql("select i from test where 
p>0").queryExecution.analyzed
+  val analyzed2 = sql("select i from test where 
p=1").queryExecution.analyzed
+  assert(Optimize.execute(analyzed1).stats.sizeInBytes/4 ===
 
 Review comment:
   ditto


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-20 Thread GitBox
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r368420662
 
 

 ##
 File path: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala
 ##
 @@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
+
+class PruneHiveTablePartitionsSuite extends QueryTest with SQLTestUtils with 
TestHiveSingleton {
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+val batches =
+  Batch("PruneHiveTablePartitions", Once,
+EliminateSubqueryAliases, new PruneHiveTablePartitions(spark)) :: Nil
+  }
+
+  test("SPARK-15616 statistics pruned after going throuhg 
PruneHiveTablePartitions") {
+withTable("test", "temp") {
+  withTempDir { dir =>
+sql(
+  s"""
+ |CREATE EXTERNAL TABLE test(i int)
+ |PARTITIONED BY (p int)
+ |STORED AS textfile
+ |LOCATION '${dir.toURI}'""".stripMargin)
+
+spark.range(0, 1000, 1).selectExpr("id as col")
+  .createOrReplaceTempView("temp")
+
+for (part <- Seq(1, 2, 3, 4)) {
+  sql(s"""
+ |INSERT OVERWRITE TABLE test PARTITION (p='$part')
+ |select col from temp""".stripMargin)
+}
+val singlePartitionSizeInBytes = 3890
+val analyzed1 = sql("select i from test where 
p>0").queryExecution.analyzed
+val analyzed2 = sql("select i from test where 
p=1").queryExecution.analyzed
+assert(Optimize.execute(analyzed1).stats.sizeInBytes === 
singlePartitionSizeInBytes*4*12/16)
 
 Review comment:
   checking the accurate file size can be flaky. Can we just check that the 
first size is more than 3 times larger than the second size?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-20 Thread GitBox
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r368420068
 
 

 ##
 File path: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala
 ##
 @@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
+
+class PruneHiveTablePartitionsSuite extends QueryTest with SQLTestUtils with 
TestHiveSingleton {
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+val batches =
+  Batch("PruneHiveTablePartitions", Once,
+EliminateSubqueryAliases, new PruneHiveTablePartitions(spark)) :: Nil
+  }
+
+  test("SPARK-15616 statistics pruned after going throuhg 
PruneHiveTablePartitions") {
+withTable("test", "temp") {
+  withTempDir { dir =>
+sql(
+  s"""
+ |CREATE EXTERNAL TABLE test(i int)
 
 Review comment:
   does it have to be an external table?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-20 Thread GitBox
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r368419917
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, ExternalCatalogUtils, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, 
Expression, ExpressionSet, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf: SQLConf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def getPartitionKeyFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): ExpressionSet = {
+val normalizedFilters = DataSourceStrategy.normalizeExprs(
+  filters.filter(f => f.deterministic && 
!SubqueryExpression.hasSubquery(f)), relation.output)
+val partitionColumnSet = AttributeSet(relation.partitionCols)
+ExpressionSet(normalizedFilters.filter { f =>
+  !f.references.isEmpty && f.references.subsetOf(partitionColumnSet)
+})
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(
+  relation: HiveTableRelation,
+  partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = {
+if (conf.metastorePartitionPruning) {
+  session.sessionState.catalog.listPartitionsByFilter(
+relation.tableMeta.identifier, partitionFilters.toSeq)
+} else {
+  ExternalCatalogUtils.prunePartitionsByFilter(relation.tableMeta,
+
session.sessionState.catalog.listPartitions(relation.tableMeta.identifier),
+partitionFilters.toSeq, conf.sessionLocalTimeZone)
+}
+  }
+
+  /**
+   * Update the statistics of the table.
+   */
+  private def updateTableMeta(
+  tableMeta: CatalogTable,
+  prunedPartitions: Seq[CatalogTablePartition]): CatalogTable = {
+val sizeOfPartitions = prunedPartitions.map { partition =>
+  val rawDataSize = 
partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong)
+  val totalSize = 
partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
+  if (rawDataSize.isDefined && rawDataSize.get > 0) {
+rawDataSize.get
+  } else if (totalSize.isDefined && totalSize.get > 0L) {
+totalSize.get
+  } else {
+0L
+  }
+}
+if (sizeOfPartitions.forall(s => s>0)) {
+  val sizeInBytes = sizeOfPartitions.sum
+  tableMeta.copy(stats = Some(CatalogStatistics(sizeInBytes = 
BigInt(sizeInBytes
+} else {
+  tableMeta
+}
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+case op @ PhysicalOperation(projections, filters, relation: 
HiveTableRelation)
+  if filters.nonEmpty && relation.isPartitioned && 
relation.prunedPartitions.isEmpty =>
+  val partitionKeyFilters = getPartitionKeyFilters(filters, relation)
+  if (partitionKeyFilters.nonEmpty) {
+val newPartitions = prunePartitions(relation, partitionKeyFilters)
+val newTableMeta = updateTableMeta(relation.tableMeta, newPartitions)
+val newRelation = relation.copy(
+  tableMeta = newTableMeta, prunedPartitions = Some(newPartitions))
+

[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-19 Thread GitBox
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r368390534
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, ExternalCatalogUtils, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, 
Expression, ExpressionSet, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf: SQLConf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def getPartitionKeyFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): ExpressionSet = {
+val normalizedFilters = DataSourceStrategy.normalizeExprs(
+  filters.filter(f => f.deterministic && 
!SubqueryExpression.hasSubquery(f)), relation.output)
+val partitionColumnSet = AttributeSet(relation.partitionCols)
+ExpressionSet(normalizedFilters.filter { f =>
+  !f.references.isEmpty && f.references.subsetOf(partitionColumnSet)
+})
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(
+  relation: HiveTableRelation,
+  partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = {
+if (conf.metastorePartitionPruning) {
+  session.sessionState.catalog.listPartitionsByFilter(
+relation.tableMeta.identifier, partitionFilters.toSeq)
+} else {
+  ExternalCatalogUtils.prunePartitionsByFilter(relation.tableMeta,
+
session.sessionState.catalog.listPartitions(relation.tableMeta.identifier),
+partitionFilters.toSeq, conf.sessionLocalTimeZone)
+}
+  }
+
+  /**
+   * Update the statistics of the table.
+   */
+  private def updateTableMeta(
+  tableMeta: CatalogTable,
+  prunedPartitions: Seq[CatalogTablePartition]): CatalogTable = {
+val sizeOfPartitions = prunedPartitions.map { partition =>
+  val rawDataSize = 
partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong)
+  val totalSize = 
partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
+  if (rawDataSize.isDefined && rawDataSize.get > 0) {
+rawDataSize.get
+  } else if (totalSize.isDefined && totalSize.get > 0L) {
+totalSize.get
+  } else {
+0L
+  }
+}
+if (sizeOfPartitions.forall(s => s>0)) {
+  val sizeInBytes = sizeOfPartitions.sum
+  tableMeta.copy(stats = Some(CatalogStatistics(sizeInBytes = 
BigInt(sizeInBytes
+} else {
+  tableMeta
+}
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+case op @ PhysicalOperation(projections, filters, relation: 
HiveTableRelation)
+  if filters.nonEmpty && relation.isPartitioned && 
relation.prunedPartitions.isEmpty =>
+  val partitionKeyFilters = getPartitionKeyFilters(filters, relation)
+  if (partitionKeyFilters.nonEmpty) {
+val newPartitions = prunePartitions(relation, partitionKeyFilters)
+val newTableMeta = updateTableMeta(relation.tableMeta, newPartitions)
+val newRelation = relation.copy(
+  tableMeta = newTableMeta,
+  pushedDownPartitionFilters = 

[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-19 Thread GitBox
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r368389539
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, ExternalCatalogUtils, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, 
Expression, ExpressionSet, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf: SQLConf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def getPartitionKeyFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): ExpressionSet = {
+val normalizedFilters = DataSourceStrategy.normalizeExprs(
+  filters.filter(f => f.deterministic && 
!SubqueryExpression.hasSubquery(f)), relation.output)
+val partitionColumnSet = AttributeSet(relation.partitionCols)
+ExpressionSet(normalizedFilters.filter { f =>
+  !f.references.isEmpty && f.references.subsetOf(partitionColumnSet)
+})
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(
+  relation: HiveTableRelation,
+  partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = {
+if (conf.metastorePartitionPruning) {
+  session.sessionState.catalog.listPartitionsByFilter(
+relation.tableMeta.identifier, partitionFilters.toSeq)
+} else {
+  ExternalCatalogUtils.prunePartitionsByFilter(relation.tableMeta,
+
session.sessionState.catalog.listPartitions(relation.tableMeta.identifier),
+partitionFilters.toSeq, conf.sessionLocalTimeZone)
+}
+  }
+
+  /**
+   * Update the statistics of the table.
+   */
+  private def updateTableMeta(
+  tableMeta: CatalogTable,
+  prunedPartitions: Seq[CatalogTablePartition]): CatalogTable = {
+val sizeOfPartitions = prunedPartitions.map { partition =>
+  val rawDataSize = 
partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong)
+  val totalSize = 
partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
+  if (rawDataSize.isDefined && rawDataSize.get > 0) {
+rawDataSize.get
+  } else if (totalSize.isDefined && totalSize.get > 0L) {
+totalSize.get
+  } else {
+0L
+  }
+}
+if (sizeOfPartitions.forall(s => s>0)) {
+  val sizeInBytes = sizeOfPartitions.sum
+  tableMeta.copy(stats = Some(CatalogStatistics(sizeInBytes = 
BigInt(sizeInBytes
+} else {
+  tableMeta
+}
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+case op @ PhysicalOperation(projections, filters, relation: 
HiveTableRelation)
+  if filters.nonEmpty && relation.isPartitioned && 
relation.prunedPartitions.isEmpty =>
+  val partitionKeyFilters = getPartitionKeyFilters(filters, relation)
+  if (partitionKeyFilters.nonEmpty) {
+val newPartitions = prunePartitions(relation, partitionKeyFilters)
+val newTableMeta = updateTableMeta(relation.tableMeta, newPartitions)
+val newRelation = relation.copy(
+  tableMeta = newTableMeta, prunedPartitions = Some(newPartitions))
+

[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-17 Thread GitBox
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r368037093
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, ExternalCatalogUtils, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, 
Expression, ExpressionSet, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf: SQLConf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def getPartitionKeyFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): ExpressionSet = {
+val normalizedFilters = DataSourceStrategy.normalizeExprs(
+  filters.filter(f => f.deterministic && 
!SubqueryExpression.hasSubquery(f)), relation.output)
+val partitionColumnSet = AttributeSet(relation.partitionCols)
+ExpressionSet(normalizedFilters.filter { f =>
+  !f.references.isEmpty && f.references.subsetOf(partitionColumnSet)
+})
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(
+  relation: HiveTableRelation,
+  partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = {
+if (conf.metastorePartitionPruning) {
+  session.sessionState.catalog.listPartitionsByFilter(
+relation.tableMeta.identifier, partitionFilters.toSeq)
+} else {
+  ExternalCatalogUtils.prunePartitionsByFilter(relation.tableMeta,
+
session.sessionState.catalog.listPartitions(relation.tableMeta.identifier),
+partitionFilters.toSeq, conf.sessionLocalTimeZone)
+}
+  }
+
+  /**
+   * Update the statistics of the table.
+   */
+  private def updateTableMeta(
+  tableMeta: CatalogTable,
+  prunedPartitions: Seq[CatalogTablePartition]): CatalogTable = {
+val sizeOfPartitions = prunedPartitions.map { partition =>
+  val rawDataSize = 
partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong)
+  val totalSize = 
partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
+  if (rawDataSize.isDefined && rawDataSize.get > 0) {
+rawDataSize.get
+  } else if (totalSize.isDefined && totalSize.get > 0L) {
+totalSize.get
+  } else {
+0L
+  }
+}
+if (sizeOfPartitions.forall(s => s>0)) {
+  val sizeInBytes = sizeOfPartitions.sum
+  tableMeta.copy(stats = Some(CatalogStatistics(sizeInBytes = 
BigInt(sizeInBytes
+} else {
+  tableMeta
+}
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+case op @ PhysicalOperation(projections, filters, relation: 
HiveTableRelation)
+  if filters.nonEmpty && relation.isPartitioned && 
relation.prunedPartitions.isEmpty =>
+  val partitionKeyFilters = getPartitionKeyFilters(filters, relation)
+  if (partitionKeyFilters.nonEmpty) {
+val newPartitions = prunePartitions(relation, partitionKeyFilters)
+val newTableMeta = updateTableMeta(relation.tableMeta, newPartitions)
+val newRelation = relation.copy(
+  tableMeta = newTableMeta, prunedPartitions = Some(newPartitions))
+

[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-17 Thread GitBox
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r367909947
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, ExternalCatalogUtils, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, 
Expression, ExpressionSet, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf: SQLConf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def getPartitionKeyFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): ExpressionSet = {
+val normalizedFilters = DataSourceStrategy.normalizeExprs(
+  filters.filter(f => f.deterministic && 
!SubqueryExpression.hasSubquery(f)), relation.output)
+val partitionColumnSet = AttributeSet(relation.partitionCols)
+ExpressionSet(normalizedFilters.filter { f =>
+  !f.references.isEmpty && f.references.subsetOf(partitionColumnSet)
+})
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(
+  relation: HiveTableRelation,
+  partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = {
+if (conf.metastorePartitionPruning) {
+  session.sessionState.catalog.listPartitionsByFilter(
+relation.tableMeta.identifier, partitionFilters.toSeq)
+} else {
+  ExternalCatalogUtils.prunePartitionsByFilter(relation.tableMeta,
 
 Review comment:
   yes


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-17 Thread GitBox
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r367909881
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, ExternalCatalogUtils, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, 
Expression, ExpressionSet, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf: SQLConf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def getPartitionKeyFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): ExpressionSet = {
+val normalizedFilters = DataSourceStrategy.normalizeExprs(
+  filters.filter(f => f.deterministic && 
!SubqueryExpression.hasSubquery(f)), relation.output)
+val partitionColumnSet = AttributeSet(relation.partitionCols)
+ExpressionSet(normalizedFilters.filter { f =>
+  !f.references.isEmpty && f.references.subsetOf(partitionColumnSet)
+})
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(
+  relation: HiveTableRelation,
+  partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = {
+if (conf.metastorePartitionPruning) {
+  session.sessionState.catalog.listPartitionsByFilter(
+relation.tableMeta.identifier, partitionFilters.toSeq)
+} else {
+  ExternalCatalogUtils.prunePartitionsByFilter(relation.tableMeta,
+
session.sessionState.catalog.listPartitions(relation.tableMeta.identifier),
+partitionFilters.toSeq, conf.sessionLocalTimeZone)
+}
+  }
+
+  /**
+   * Update the statistics of the table.
+   */
+  private def updateTableMeta(
+  tableMeta: CatalogTable,
+  prunedPartitions: Seq[CatalogTablePartition]): CatalogTable = {
+val sizeOfPartitions = prunedPartitions.map { partition =>
+  val rawDataSize = 
partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong)
+  val totalSize = 
partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
+  if (rawDataSize.isDefined && rawDataSize.get > 0) {
+rawDataSize.get
+  } else if (totalSize.isDefined && totalSize.get > 0L) {
+totalSize.get
+  } else {
+0L
+  }
+}
+if (sizeOfPartitions.forall(s => s>0)) {
+  val sizeInBytes = sizeOfPartitions.sum
+  tableMeta.copy(stats = Some(CatalogStatistics(sizeInBytes = 
BigInt(sizeInBytes
+} else {
+  tableMeta
+}
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+case op @ PhysicalOperation(projections, filters, relation: 
HiveTableRelation)
+  if filters.nonEmpty && relation.isPartitioned && 
relation.prunedPartitions.isEmpty =>
+  val partitionKeyFilters = getPartitionKeyFilters(filters, relation)
+  if (partitionKeyFilters.nonEmpty) {
+val newPartitions = prunePartitions(relation, partitionKeyFilters)
+val newTableMeta = updateTableMeta(relation.tableMeta, newPartitions)
+val newRelation = relation.copy(
+  tableMeta = newTableMeta, prunedPartitions = Some(newPartitions))
+

[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-17 Thread GitBox
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r367826109
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, ExternalCatalogUtils, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, 
Expression, ExpressionSet, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf: SQLConf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def getPartitionKeyFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): ExpressionSet = {
+val normalizedFilters = DataSourceStrategy.normalizeExprs(
+  filters.filter(f => f.deterministic && 
!SubqueryExpression.hasSubquery(f)), relation.output)
+val partitionColumnSet = AttributeSet(relation.partitionCols)
+ExpressionSet(normalizedFilters.filter { f =>
+  !f.references.isEmpty && f.references.subsetOf(partitionColumnSet)
+})
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(
+  relation: HiveTableRelation,
+  partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = {
+if (conf.metastorePartitionPruning) {
+  session.sessionState.catalog.listPartitionsByFilter(
+relation.tableMeta.identifier, partitionFilters.toSeq)
+} else {
+  ExternalCatalogUtils.prunePartitionsByFilter(relation.tableMeta,
+
session.sessionState.catalog.listPartitions(relation.tableMeta.identifier),
+partitionFilters.toSeq, conf.sessionLocalTimeZone)
+}
+  }
+
+  /**
+   * Update the statistics of the table.
+   */
+  private def updateTableMeta(
+  tableMeta: CatalogTable,
+  prunedPartitions: Seq[CatalogTablePartition]): CatalogTable = {
+val sizeOfPartitions = prunedPartitions.map { partition =>
+  val rawDataSize = 
partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong)
+  val totalSize = 
partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
+  if (rawDataSize.isDefined && rawDataSize.get > 0) {
+rawDataSize.get
+  } else if (totalSize.isDefined && totalSize.get > 0L) {
+totalSize.get
+  } else {
+0L
+  }
+}
+if (sizeOfPartitions.forall(s => s>0)) {
+  val sizeInBytes = sizeOfPartitions.sum
+  tableMeta.copy(stats = Some(CatalogStatistics(sizeInBytes = 
BigInt(sizeInBytes
+} else {
+  tableMeta
+}
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+case op @ PhysicalOperation(projections, filters, relation: 
HiveTableRelation)
+  if filters.nonEmpty && relation.isPartitioned && 
relation.prunedPartitions.isEmpty =>
+  val partitionKeyFilters = getPartitionKeyFilters(filters, relation)
+  if (partitionKeyFilters.nonEmpty) {
+val newPartitions = prunePartitions(relation, partitionKeyFilters)
+val newTableMeta = updateTableMeta(relation.tableMeta, newPartitions)
+val newRelation = relation.copy(
+  tableMeta = newTableMeta, prunedPartitions = Some(newPartitions))
+

[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-17 Thread GitBox
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r367826109
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, ExternalCatalogUtils, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, 
Expression, ExpressionSet, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf: SQLConf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def getPartitionKeyFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): ExpressionSet = {
+val normalizedFilters = DataSourceStrategy.normalizeExprs(
+  filters.filter(f => f.deterministic && 
!SubqueryExpression.hasSubquery(f)), relation.output)
+val partitionColumnSet = AttributeSet(relation.partitionCols)
+ExpressionSet(normalizedFilters.filter { f =>
+  !f.references.isEmpty && f.references.subsetOf(partitionColumnSet)
+})
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(
+  relation: HiveTableRelation,
+  partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = {
+if (conf.metastorePartitionPruning) {
+  session.sessionState.catalog.listPartitionsByFilter(
+relation.tableMeta.identifier, partitionFilters.toSeq)
+} else {
+  ExternalCatalogUtils.prunePartitionsByFilter(relation.tableMeta,
+
session.sessionState.catalog.listPartitions(relation.tableMeta.identifier),
+partitionFilters.toSeq, conf.sessionLocalTimeZone)
+}
+  }
+
+  /**
+   * Update the statistics of the table.
+   */
+  private def updateTableMeta(
+  tableMeta: CatalogTable,
+  prunedPartitions: Seq[CatalogTablePartition]): CatalogTable = {
+val sizeOfPartitions = prunedPartitions.map { partition =>
+  val rawDataSize = 
partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong)
+  val totalSize = 
partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
+  if (rawDataSize.isDefined && rawDataSize.get > 0) {
+rawDataSize.get
+  } else if (totalSize.isDefined && totalSize.get > 0L) {
+totalSize.get
+  } else {
+0L
+  }
+}
+if (sizeOfPartitions.forall(s => s>0)) {
+  val sizeInBytes = sizeOfPartitions.sum
+  tableMeta.copy(stats = Some(CatalogStatistics(sizeInBytes = 
BigInt(sizeInBytes
+} else {
+  tableMeta
+}
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+case op @ PhysicalOperation(projections, filters, relation: 
HiveTableRelation)
+  if filters.nonEmpty && relation.isPartitioned && 
relation.prunedPartitions.isEmpty =>
+  val partitionKeyFilters = getPartitionKeyFilters(filters, relation)
+  if (partitionKeyFilters.nonEmpty) {
+val newPartitions = prunePartitions(relation, partitionKeyFilters)
+val newTableMeta = updateTableMeta(relation.tableMeta, newPartitions)
+val newRelation = relation.copy(
+  tableMeta = newTableMeta, prunedPartitions = Some(newPartitions))
+

[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-17 Thread GitBox
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r367822954
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, ExternalCatalogUtils, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, 
Expression, ExpressionSet, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf: SQLConf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def getPartitionKeyFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): ExpressionSet = {
+val normalizedFilters = DataSourceStrategy.normalizeExprs(
+  filters.filter(f => f.deterministic && 
!SubqueryExpression.hasSubquery(f)), relation.output)
+val partitionColumnSet = AttributeSet(relation.partitionCols)
+ExpressionSet(normalizedFilters.filter { f =>
+  !f.references.isEmpty && f.references.subsetOf(partitionColumnSet)
+})
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(
+  relation: HiveTableRelation,
+  partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = {
+if (conf.metastorePartitionPruning) {
+  session.sessionState.catalog.listPartitionsByFilter(
+relation.tableMeta.identifier, partitionFilters.toSeq)
+} else {
+  ExternalCatalogUtils.prunePartitionsByFilter(relation.tableMeta,
 
 Review comment:
   sorry I misread the code. so there are 2 optimizations:
   1. pushdown predicates to hive metastore
   2. pushdown predicates earlier to get precise data size info.
   
   This rule is for the second optimization, and this branch is for skip the 
first optimization.
   
   makes sense to me


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-16 Thread GitBox
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r367788000
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, ExternalCatalogUtils, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, 
Expression, ExpressionSet, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf: SQLConf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def getPartitionKeyFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): ExpressionSet = {
+val normalizedFilters = DataSourceStrategy.normalizeExprs(
+  filters.filter(f => f.deterministic && 
!SubqueryExpression.hasSubquery(f)), relation.output)
+val partitionColumnSet = AttributeSet(relation.partitionCols)
+ExpressionSet(normalizedFilters.filter { f =>
+  !f.references.isEmpty && f.references.subsetOf(partitionColumnSet)
+})
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(
+  relation: HiveTableRelation,
+  partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = {
+if (conf.metastorePartitionPruning) {
+  session.sessionState.catalog.listPartitionsByFilter(
+relation.tableMeta.identifier, partitionFilters.toSeq)
+} else {
+  ExternalCatalogUtils.prunePartitionsByFilter(relation.tableMeta,
 
 Review comment:
   can we remove this branch? We should always do the optimization.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-16 Thread GitBox
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r367304505
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.IOException
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, 
Expression, ExpressionSet, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def getPartitionKeyFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): ExpressionSet = {
+val normalizedFilters = DataSourceStrategy.normalizeExprs(
+  filters.filter(f => f.deterministic && 
!SubqueryExpression.hasSubquery(f)), relation.output)
+val partitionColumnSet = AttributeSet(relation.partitionCols)
+ExpressionSet(normalizedFilters.filter { f =>
+  !f.references.isEmpty && f.references.subsetOf(partitionColumnSet)
+})
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(
+  relation: HiveTableRelation,
+  partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = {
+if (conf.metastorePartitionPruning) {
 
 Review comment:
   actually, nvm. Users can exclude optimizer rules to disable them. Let's 
follow `PruneFileSourcePartitions` and always do pruning.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-16 Thread GitBox
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r367303681
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.IOException
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, 
Expression, ExpressionSet, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def getPartitionKeyFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): ExpressionSet = {
+val normalizedFilters = DataSourceStrategy.normalizeExprs(
+  filters.filter(f => f.deterministic && 
!SubqueryExpression.hasSubquery(f)), relation.output)
+val partitionColumnSet = AttributeSet(relation.partitionCols)
+ExpressionSet(normalizedFilters.filter { f =>
+  !f.references.isEmpty && f.references.subsetOf(partitionColumnSet)
+})
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(
+  relation: HiveTableRelation,
+  partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = {
+if (conf.metastorePartitionPruning) {
 
 Review comment:
   yea


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-15 Thread GitBox
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r367229511
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.IOException
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, 
Expression, ExpressionSet, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def getPartitionKeyFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): ExpressionSet = {
+val normalizedFilters = DataSourceStrategy.normalizeExprs(
+  filters.filter(f => f.deterministic && 
!SubqueryExpression.hasSubquery(f)), relation.output)
+val partitionColumnSet = AttributeSet(relation.partitionCols)
+ExpressionSet(normalizedFilters.filter { f =>
+  !f.references.isEmpty && f.references.subsetOf(partitionColumnSet)
+})
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(
+  relation: HiveTableRelation,
+  partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = {
+if (conf.metastorePartitionPruning) {
 
 Review comment:
   then can we add a new config for this optimization?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-15 Thread GitBox
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r367229511
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.IOException
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, 
Expression, ExpressionSet, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def getPartitionKeyFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): ExpressionSet = {
+val normalizedFilters = DataSourceStrategy.normalizeExprs(
+  filters.filter(f => f.deterministic && 
!SubqueryExpression.hasSubquery(f)), relation.output)
+val partitionColumnSet = AttributeSet(relation.partitionCols)
+ExpressionSet(normalizedFilters.filter { f =>
+  !f.references.isEmpty && f.references.subsetOf(partitionColumnSet)
+})
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(
+  relation: HiveTableRelation,
+  partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = {
+if (conf.metastorePartitionPruning) {
 
 Review comment:
   then can do add a new config for this optimization?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-15 Thread GitBox
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r366739954
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.IOException
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, 
Expression, ExpressionSet, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def getPartitionKeyFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): ExpressionSet = {
+val normalizedFilters = DataSourceStrategy.normalizeExprs(
+  filters.filter(f => f.deterministic && 
!SubqueryExpression.hasSubquery(f)), relation.output)
+val partitionColumnSet = AttributeSet(relation.partitionCols)
+ExpressionSet(normalizedFilters.filter { f =>
+  !f.references.isEmpty && f.references.subsetOf(partitionColumnSet)
+})
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(
+  relation: HiveTableRelation,
+  partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = {
+if (conf.metastorePartitionPruning) {
+  session.sessionState.catalog.listPartitionsByFilter(
+relation.tableMeta.identifier, partitionFilters.toSeq)
+} else {
+  
session.sessionState.catalog.listPartitions(relation.tableMeta.identifier)
+}
+  }
+
+  /**
+   * Update the statistics of the table.
+   */
+  private def updateTableMeta(
+  tableMeta: CatalogTable,
+  prunedPartitions: Seq[CatalogTablePartition]): CatalogTable = {
+val sizeOfPartitions = try {
+  prunedPartitions.map { partition =>
+val rawDataSize = 
partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong)
+val totalSize = 
partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
+if (rawDataSize.isDefined && rawDataSize.get > 0) {
+  rawDataSize.get
+} else if (totalSize.isDefined && totalSize.get > 0L) {
+  totalSize.get
+} else {
+  0L
+}
+  }.sum
+} catch {
+  case e: IOException =>
 
 Review comment:
   how can we throw IO exception?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-15 Thread GitBox
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r366739843
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.IOException
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, 
Expression, ExpressionSet, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def getPartitionKeyFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): ExpressionSet = {
+val normalizedFilters = DataSourceStrategy.normalizeExprs(
+  filters.filter(f => f.deterministic && 
!SubqueryExpression.hasSubquery(f)), relation.output)
+val partitionColumnSet = AttributeSet(relation.partitionCols)
+ExpressionSet(normalizedFilters.filter { f =>
+  !f.references.isEmpty && f.references.subsetOf(partitionColumnSet)
+})
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(
+  relation: HiveTableRelation,
+  partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = {
+if (conf.metastorePartitionPruning) {
+  session.sessionState.catalog.listPartitionsByFilter(
+relation.tableMeta.identifier, partitionFilters.toSeq)
+} else {
+  
session.sessionState.catalog.listPartitions(relation.tableMeta.identifier)
+}
+  }
+
+  /**
+   * Update the statistics of the table.
+   */
+  private def updateTableMeta(
+  tableMeta: CatalogTable,
+  prunedPartitions: Seq[CatalogTablePartition]): CatalogTable = {
+val sizeOfPartitions = try {
+  prunedPartitions.map { partition =>
+val rawDataSize = 
partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong)
+val totalSize = 
partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
+if (rawDataSize.isDefined && rawDataSize.get > 0) {
+  rawDataSize.get
+} else if (totalSize.isDefined && totalSize.get > 0L) {
+  totalSize.get
+} else {
+  0L
 
 Review comment:
   what if some partitions have size stats and some don't? I think it's safer 
to not change the table stats if there is one partition that doesn't have size 
stats.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-15 Thread GitBox
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r366738131
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.IOException
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, 
Expression, ExpressionSet, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def getPartitionKeyFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): ExpressionSet = {
+val normalizedFilters = DataSourceStrategy.normalizeExprs(
+  filters.filter(f => f.deterministic && 
!SubqueryExpression.hasSubquery(f)), relation.output)
+val partitionColumnSet = AttributeSet(relation.partitionCols)
+ExpressionSet(normalizedFilters.filter { f =>
+  !f.references.isEmpty && f.references.subsetOf(partitionColumnSet)
+})
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(
+  relation: HiveTableRelation,
+  partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = {
+if (conf.metastorePartitionPruning) {
 
 Review comment:
   instead of check the config here and list all partition if not enabled, we 
can simply check the config in `apply`:
   
https://github.com/apache/spark/pull/26805/files#diff-6be42cfa3c62a7536b1eb1d6447c073cR99


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-14 Thread GitBox
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r366458689
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.IOException
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet, BindReferences, Expression, Literal, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.CommandUtils
+import org.apache.spark.sql.hive.client.HiveClientImpl
+import org.apache.spark.sql.types.BooleanType
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def extractPartitionPruningFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): Seq[Expression] = {
+val normalizedFilters = filters.map { e =>
 
 Review comment:
   did you follow it? I don't see you filter out non-deterministic filters and 
other stuff like `PruneFileSourcePartitions.getPartitionKeyFilters` did.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-14 Thread GitBox
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r366238516
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.IOException
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet, BindReferences, Expression, Literal, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.CommandUtils
+import org.apache.spark.sql.hive.client.HiveClientImpl
+import org.apache.spark.sql.types.BooleanType
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def extractPartitionPruningFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): Seq[Expression] = {
+val normalizedFilters = filters.map { e =>
+  e transform {
+case a: AttributeReference =>
+  a.withName(relation.output.find(_.semanticEquals(a)).get.name)
+  }
+}
+val partitionSet = AttributeSet(relation.partitionCols)
+normalizedFilters.filter { predicate =>
+  !predicate.references.isEmpty && 
predicate.references.subsetOf(partitionSet)
+}
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(relation: HiveTableRelation, partitionFilters: 
Seq[Expression])
+  : Seq[CatalogTablePartition] = {
+val partitions =
+if (conf.metastorePartitionPruning) {
+  session.sessionState.catalog.listPartitionsByFilter(
+relation.tableMeta.identifier, partitionFilters)
+} else {
+  
session.sessionState.catalog.listPartitions(relation.tableMeta.identifier)
+}
+val shouldKeep = partitionFilters.reduceLeftOption(And).map { filter =>
 
 Review comment:
   Yea, but we can leave it in followup or fix it in this PR. The code newly 
added here should be updated.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-14 Thread GitBox
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r366238516
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.IOException
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet, BindReferences, Expression, Literal, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.CommandUtils
+import org.apache.spark.sql.hive.client.HiveClientImpl
+import org.apache.spark.sql.types.BooleanType
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def extractPartitionPruningFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): Seq[Expression] = {
+val normalizedFilters = filters.map { e =>
+  e transform {
+case a: AttributeReference =>
+  a.withName(relation.output.find(_.semanticEquals(a)).get.name)
+  }
+}
+val partitionSet = AttributeSet(relation.partitionCols)
+normalizedFilters.filter { predicate =>
+  !predicate.references.isEmpty && 
predicate.references.subsetOf(partitionSet)
+}
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(relation: HiveTableRelation, partitionFilters: 
Seq[Expression])
+  : Seq[CatalogTablePartition] = {
+val partitions =
+if (conf.metastorePartitionPruning) {
+  session.sessionState.catalog.listPartitionsByFilter(
+relation.tableMeta.identifier, partitionFilters)
+} else {
+  
session.sessionState.catalog.listPartitions(relation.tableMeta.identifier)
+}
+val shouldKeep = partitionFilters.reduceLeftOption(And).map { filter =>
 
 Review comment:
   Yea, but we can leave it in followup. The code newly added here should be 
updated.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-14 Thread GitBox
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r366226969
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.IOException
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet, BindReferences, Expression, Literal, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.CommandUtils
+import org.apache.spark.sql.hive.client.HiveClientImpl
+import org.apache.spark.sql.types.BooleanType
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def extractPartitionPruningFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): Seq[Expression] = {
+val normalizedFilters = filters.map { e =>
+  e transform {
+case a: AttributeReference =>
+  a.withName(relation.output.find(_.semanticEquals(a)).get.name)
+  }
+}
+val partitionSet = AttributeSet(relation.partitionCols)
+normalizedFilters.filter { predicate =>
+  !predicate.references.isEmpty && 
predicate.references.subsetOf(partitionSet)
+}
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(relation: HiveTableRelation, partitionFilters: 
Seq[Expression])
+  : Seq[CatalogTablePartition] = {
+val partitions =
+if (conf.metastorePartitionPruning) {
+  session.sessionState.catalog.listPartitionsByFilter(
+relation.tableMeta.identifier, partitionFilters)
+} else {
+  
session.sessionState.catalog.listPartitions(relation.tableMeta.identifier)
+}
+val shouldKeep = partitionFilters.reduceLeftOption(And).map { filter =>
 
 Review comment:
   I mean, it's already fixed, `ExternalCatalog.listPartitionsByFilter` 
guarantees that the returned partitions are exactly what we want.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-14 Thread GitBox
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r366219935
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.IOException
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet, BindReferences, Expression, Literal, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.CommandUtils
+import org.apache.spark.sql.hive.client.HiveClientImpl
+import org.apache.spark.sql.types.BooleanType
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def extractPartitionPruningFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): Seq[Expression] = {
+val normalizedFilters = filters.map { e =>
+  e transform {
+case a: AttributeReference =>
+  a.withName(relation.output.find(_.semanticEquals(a)).get.name)
+  }
+}
+val partitionSet = AttributeSet(relation.partitionCols)
+normalizedFilters.filter { predicate =>
+  !predicate.references.isEmpty && 
predicate.references.subsetOf(partitionSet)
+}
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(relation: HiveTableRelation, partitionFilters: 
Seq[Expression])
+  : Seq[CatalogTablePartition] = {
+val partitions =
+if (conf.metastorePartitionPruning) {
+  session.sessionState.catalog.listPartitionsByFilter(
+relation.tableMeta.identifier, partitionFilters)
+} else {
+  
session.sessionState.catalog.listPartitions(relation.tableMeta.identifier)
+}
+val shouldKeep = partitionFilters.reduceLeftOption(And).map { filter =>
 
 Review comment:
   you are right hive may not support some predicates, so 
`HiveClientImpl.listPartitionByFilter` may return more partitions than we 
expect. But it should be fixed in `HiveExternalCatalog.listPartitionsByFilter`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-14 Thread GitBox
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r366219251
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.IOException
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet, BindReferences, Expression, Literal, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.CommandUtils
+import org.apache.spark.sql.hive.client.HiveClientImpl
+import org.apache.spark.sql.types.BooleanType
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def extractPartitionPruningFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): Seq[Expression] = {
+val normalizedFilters = filters.map { e =>
+  e transform {
+case a: AttributeReference =>
+  a.withName(relation.output.find(_.semanticEquals(a)).get.name)
+  }
+}
+val partitionSet = AttributeSet(relation.partitionCols)
+normalizedFilters.filter { predicate =>
+  !predicate.references.isEmpty && 
predicate.references.subsetOf(partitionSet)
+}
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(relation: HiveTableRelation, partitionFilters: 
Seq[Expression])
+  : Seq[CatalogTablePartition] = {
+val partitions =
+if (conf.metastorePartitionPruning) {
+  session.sessionState.catalog.listPartitionsByFilter(
+relation.tableMeta.identifier, partitionFilters)
+} else {
+  
session.sessionState.catalog.listPartitions(relation.tableMeta.identifier)
+}
+val shouldKeep = partitionFilters.reduceLeftOption(And).map { filter =>
+  require(filter.dataType == BooleanType,
+s"Data type of predicate $filter must be ${BooleanType.catalogString} 
rather than " +
+  s"${filter.dataType.catalogString}.")
+  BindReferences.bindReference(filter, relation.partitionCols)
+}
+if (shouldKeep.nonEmpty) {
+  partitions.filter{ partition =>
+val hivePartition =
+  HiveClientImpl.toHivePartition(partition, 
HiveClientImpl.toHiveTable(relation.tableMeta))
+val dataTypes = relation.partitionCols.map(_.dataType)
+val castedValues = hivePartition.getValues.asScala.zip(dataTypes)
+  .map { case (value, dataType) => cast(Literal(value), 
dataType).eval(null) }
+val row = InternalRow.fromSeq(castedValues)
+shouldKeep.get.eval(row).asInstanceOf[Boolean]
+  }
+} else {
+  partitions
+}
+  }
+
+  /**
+   * Update the statistics of the table.
+   */
+  private def updateTableMeta(
+  tableMeta: CatalogTable,
+  prunedPartitions: Seq[CatalogTablePartition]): CatalogTable = {
+val sizeInBytes = try {
+  prunedPartitions.map { partition =>
+val rawDataSize = 
partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong)
+val totalSize = 
partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
+if (rawDataSize.isDefined && 

[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-13 Thread GitBox
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r365788538
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.IOException
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet, BindReferences, Expression, Literal, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.CommandUtils
+import org.apache.spark.sql.hive.client.HiveClientImpl
+import org.apache.spark.sql.types.BooleanType
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def extractPartitionPruningFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): Seq[Expression] = {
+val normalizedFilters = filters.map { e =>
+  e transform {
+case a: AttributeReference =>
+  a.withName(relation.output.find(_.semanticEquals(a)).get.name)
+  }
+}
+val partitionSet = AttributeSet(relation.partitionCols)
+normalizedFilters.filter { predicate =>
+  !predicate.references.isEmpty && 
predicate.references.subsetOf(partitionSet)
+}
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(relation: HiveTableRelation, partitionFilters: 
Seq[Expression])
+  : Seq[CatalogTablePartition] = {
+val partitions =
+if (conf.metastorePartitionPruning) {
+  session.sessionState.catalog.listPartitionsByFilter(
+relation.tableMeta.identifier, partitionFilters)
+} else {
+  
session.sessionState.catalog.listPartitions(relation.tableMeta.identifier)
+}
+val shouldKeep = partitionFilters.reduceLeftOption(And).map { filter =>
+  require(filter.dataType == BooleanType,
+s"Data type of predicate $filter must be ${BooleanType.catalogString} 
rather than " +
+  s"${filter.dataType.catalogString}.")
+  BindReferences.bindReference(filter, relation.partitionCols)
+}
+if (shouldKeep.nonEmpty) {
+  partitions.filter{ partition =>
+val hivePartition =
+  HiveClientImpl.toHivePartition(partition, 
HiveClientImpl.toHiveTable(relation.tableMeta))
+val dataTypes = relation.partitionCols.map(_.dataType)
+val castedValues = hivePartition.getValues.asScala.zip(dataTypes)
+  .map { case (value, dataType) => cast(Literal(value), 
dataType).eval(null) }
+val row = InternalRow.fromSeq(castedValues)
+shouldKeep.get.eval(row).asInstanceOf[Boolean]
+  }
+} else {
+  partitions
+}
+  }
+
+  /**
+   * Update the statistics of the table.
+   */
+  private def updateTableMeta(
+  tableMeta: CatalogTable,
+  prunedPartitions: Seq[CatalogTablePartition]): CatalogTable = {
+val sizeInBytes = try {
+  prunedPartitions.map { partition =>
+val rawDataSize = 
partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong)
+val totalSize = 
partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
+if (rawDataSize.isDefined && 

[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-13 Thread GitBox
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r365788538
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.IOException
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet, BindReferences, Expression, Literal, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.CommandUtils
+import org.apache.spark.sql.hive.client.HiveClientImpl
+import org.apache.spark.sql.types.BooleanType
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def extractPartitionPruningFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): Seq[Expression] = {
+val normalizedFilters = filters.map { e =>
+  e transform {
+case a: AttributeReference =>
+  a.withName(relation.output.find(_.semanticEquals(a)).get.name)
+  }
+}
+val partitionSet = AttributeSet(relation.partitionCols)
+normalizedFilters.filter { predicate =>
+  !predicate.references.isEmpty && 
predicate.references.subsetOf(partitionSet)
+}
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(relation: HiveTableRelation, partitionFilters: 
Seq[Expression])
+  : Seq[CatalogTablePartition] = {
+val partitions =
+if (conf.metastorePartitionPruning) {
+  session.sessionState.catalog.listPartitionsByFilter(
+relation.tableMeta.identifier, partitionFilters)
+} else {
+  
session.sessionState.catalog.listPartitions(relation.tableMeta.identifier)
+}
+val shouldKeep = partitionFilters.reduceLeftOption(And).map { filter =>
+  require(filter.dataType == BooleanType,
+s"Data type of predicate $filter must be ${BooleanType.catalogString} 
rather than " +
+  s"${filter.dataType.catalogString}.")
+  BindReferences.bindReference(filter, relation.partitionCols)
+}
+if (shouldKeep.nonEmpty) {
+  partitions.filter{ partition =>
+val hivePartition =
+  HiveClientImpl.toHivePartition(partition, 
HiveClientImpl.toHiveTable(relation.tableMeta))
+val dataTypes = relation.partitionCols.map(_.dataType)
+val castedValues = hivePartition.getValues.asScala.zip(dataTypes)
+  .map { case (value, dataType) => cast(Literal(value), 
dataType).eval(null) }
+val row = InternalRow.fromSeq(castedValues)
+shouldKeep.get.eval(row).asInstanceOf[Boolean]
+  }
+} else {
+  partitions
+}
+  }
+
+  /**
+   * Update the statistics of the table.
+   */
+  private def updateTableMeta(
+  tableMeta: CatalogTable,
+  prunedPartitions: Seq[CatalogTablePartition]): CatalogTable = {
+val sizeInBytes = try {
+  prunedPartitions.map { partition =>
+val rawDataSize = 
partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong)
+val totalSize = 
partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
+if (rawDataSize.isDefined && 

[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-13 Thread GitBox
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r365787617
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.IOException
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet, BindReferences, Expression, Literal, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.CommandUtils
+import org.apache.spark.sql.hive.client.HiveClientImpl
+import org.apache.spark.sql.types.BooleanType
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def extractPartitionPruningFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): Seq[Expression] = {
+val normalizedFilters = filters.map { e =>
+  e transform {
+case a: AttributeReference =>
+  a.withName(relation.output.find(_.semanticEquals(a)).get.name)
+  }
+}
+val partitionSet = AttributeSet(relation.partitionCols)
+normalizedFilters.filter { predicate =>
+  !predicate.references.isEmpty && 
predicate.references.subsetOf(partitionSet)
+}
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(relation: HiveTableRelation, partitionFilters: 
Seq[Expression])
+  : Seq[CatalogTablePartition] = {
+val partitions =
+if (conf.metastorePartitionPruning) {
+  session.sessionState.catalog.listPartitionsByFilter(
+relation.tableMeta.identifier, partitionFilters)
+} else {
+  
session.sessionState.catalog.listPartitions(relation.tableMeta.identifier)
+}
+val shouldKeep = partitionFilters.reduceLeftOption(And).map { filter =>
 
 Review comment:
   do we need this? AFAIK `ExternalCatalog.listPartitionsByFilter` does it 
already.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-13 Thread GitBox
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r365786611
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.IOException
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet, BindReferences, Expression, Literal, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.CommandUtils
+import org.apache.spark.sql.hive.client.HiveClientImpl
+import org.apache.spark.sql.types.BooleanType
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def extractPartitionPruningFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): Seq[Expression] = {
+val normalizedFilters = filters.map { e =>
+  e transform {
+case a: AttributeReference =>
+  a.withName(relation.output.find(_.semanticEquals(a)).get.name)
+  }
+}
+val partitionSet = AttributeSet(relation.partitionCols)
+normalizedFilters.filter { predicate =>
+  !predicate.references.isEmpty && 
predicate.references.subsetOf(partitionSet)
+}
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table.
+   */
+  private def prunePartitions(relation: HiveTableRelation, partitionFilters: 
Seq[Expression])
 
 Review comment:
   nit:
   ```
   def f(
   para1: T,
   para2: T): R = ...
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-13 Thread GitBox
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r365786376
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.IOException
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet, BindReferences, Expression, Literal, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.CommandUtils
+import org.apache.spark.sql.hive.client.HiveClientImpl
+import org.apache.spark.sql.types.BooleanType
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def extractPartitionPruningFilters(
+  filters: Seq[Expression],
+  relation: HiveTableRelation): Seq[Expression] = {
+val normalizedFilters = filters.map { e =>
 
 Review comment:
   let's follow `PruneFileSourcePartitions.getPartitionKeyFilters`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-13 Thread GitBox
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r365786236
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 ##
 @@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.IOException
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.hive.common.StatsSetupConst
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet, BindReferences, Expression, Literal, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.CommandUtils
+import org.apache.spark.sql.hive.client.HiveClientImpl
+import org.apache.spark.sql.types.BooleanType
+
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+private[sql] class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  override val conf = session.sessionState.conf
+
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def extractPartitionPruningFilters(
+  filters: Seq[Expression],
 
 Review comment:
   nit: 4 space indentation


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-05 Thread GitBox
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r363162339
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
 ##
 @@ -1375,6 +1375,16 @@ object SQLConf {
 .booleanConf
 .createWithDefault(false)
 
+  val FALL_BACK_TO_HDFS_FOR_STATS_MAX_PART_NUM =
+buildConf("spark.sql.statistics.fallBackToHdfs.maxPartitionNum")
+.doc("If the number of table partitions exceed this value, falling back to 
hdfs " +
+  "for statistics calculation is not allowed. This is used to avoid 
calculating " +
+  "the size of a large number of partitions through hdfs, which is very 
time consuming." +
+  "Setting this value to 0 or negative will disable falling back to hdfs 
for " +
+  "partition statistic calculation.")
 
 Review comment:
   If this is a common problem, let's leave it here and open a new PR to fix it 
completely later.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-02 Thread GitBox
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r362397234
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
 ##
 @@ -150,6 +151,100 @@ class DetermineTableStats(session: SparkSession) extends 
Rule[LogicalPlan] {
   }
 }
 
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+case class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with PredicateHelper {
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def extractPartitionPruningFilters(filters: Seq[Expression],
+ relation: HiveTableRelation): 
Seq[Expression] = {
+val normalizedFilters = filters.map { e =>
+  e transform {
+case a: AttributeReference =>
+  a.withName(relation.output.find(_.semanticEquals(a)).get.name)
+  }
+}
+val partitionSet = AttributeSet(relation.partitionCols)
+normalizedFilters.filter { predicate =>
+  !predicate.references.isEmpty && 
predicate.references.subsetOf(partitionSet)
+}
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table,
+   * and also update the statistics of the table.
+   */
+  private def prunedHiveTableWithStats(relation: HiveTableRelation,
+   partitionFilters: Seq[Expression]): 
HiveTableRelation = {
+val conf = session.sessionState.conf
+val prunedPartitions = 
session.sharedState.externalCatalog.listPartitionsByFilter(
+  relation.tableMeta.database,
+  relation.tableMeta.identifier.table,
+  partitionFilters,
+  conf.sessionLocalTimeZone)
+val sizeInBytes = try {
+  val partitionsWithSize = prunedPartitions.map { part =>
+val rawDataSize = 
part.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong)
+val totalSize = 
part.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
+if (rawDataSize.isDefined && rawDataSize.get > 0) {
+  (part, rawDataSize.get)
+} else if (totalSize.isDefined && totalSize.get > 0L) {
+  (part, totalSize.get)
+} else {
+  (part, 0L)
+}
+  }
+  val sizeOfPartitions =
 
 Review comment:
   shall we do it in `DetermineTableStats`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-02 Thread GitBox
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r362397107
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
 ##
 @@ -150,6 +151,100 @@ class DetermineTableStats(session: SparkSession) extends 
Rule[LogicalPlan] {
   }
 }
 
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+case class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with PredicateHelper {
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def extractPartitionPruningFilters(filters: Seq[Expression],
+ relation: HiveTableRelation): 
Seq[Expression] = {
+val normalizedFilters = filters.map { e =>
+  e transform {
+case a: AttributeReference =>
+  a.withName(relation.output.find(_.semanticEquals(a)).get.name)
+  }
+}
+val partitionSet = AttributeSet(relation.partitionCols)
+normalizedFilters.filter { predicate =>
+  !predicate.references.isEmpty && 
predicate.references.subsetOf(partitionSet)
+}
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table,
+   * and also update the statistics of the table.
+   */
+  private def prunedHiveTableWithStats(relation: HiveTableRelation,
+   partitionFilters: Seq[Expression]): 
HiveTableRelation = {
+val conf = session.sessionState.conf
+val prunedPartitions = 
session.sharedState.externalCatalog.listPartitionsByFilter(
 
 Review comment:
   We should call `SessionCatalog.listPartitionsByFilter`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-02 Thread GitBox
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r362396970
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
 ##
 @@ -1375,6 +1375,16 @@ object SQLConf {
 .booleanConf
 .createWithDefault(false)
 
+  val FALL_BACK_TO_HDFS_FOR_STATS_MAX_PART_NUM =
+buildConf("spark.sql.statistics.fallBackToHdfs.maxPartitionNum")
+.doc("If the number of table partitions exceed this value, falling back to 
hdfs " +
+  "for statistics calculation is not allowed. This is used to avoid 
calculating " +
+  "the size of a large number of partitions through hdfs, which is very 
time consuming." +
+  "Setting this value to 0 or negative will disable falling back to hdfs 
for " +
+  "partition statistic calculation.")
 
 Review comment:
   does the data source table have the same problem?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-02 Thread GitBox
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r362396308
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
 ##
 @@ -150,6 +151,100 @@ class DetermineTableStats(session: SparkSession) extends 
Rule[LogicalPlan] {
   }
 }
 
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+case class PruneHiveTablePartitions(session: SparkSession)
 
 Review comment:
   let's move it to a separated file.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-02 Thread GitBox
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r362396438
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
 ##
 @@ -150,6 +151,100 @@ class DetermineTableStats(session: SparkSession) extends 
Rule[LogicalPlan] {
   }
 }
 
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+case class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with PredicateHelper {
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def extractPartitionPruningFilters(filters: Seq[Expression],
+ relation: HiveTableRelation): 
Seq[Expression] = {
+val normalizedFilters = filters.map { e =>
+  e transform {
+case a: AttributeReference =>
+  a.withName(relation.output.find(_.semanticEquals(a)).get.name)
+  }
+}
+val partitionSet = AttributeSet(relation.partitionCols)
+normalizedFilters.filter { predicate =>
+  !predicate.references.isEmpty && 
predicate.references.subsetOf(partitionSet)
+}
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table,
+   * and also update the statistics of the table.
+   */
+  private def prunedHiveTableWithStats(relation: HiveTableRelation,
+   partitionFilters: Seq[Expression]): 
HiveTableRelation = {
+val conf = session.sessionState.conf
+val prunedPartitions = 
session.sharedState.externalCatalog.listPartitionsByFilter(
+  relation.tableMeta.database,
+  relation.tableMeta.identifier.table,
+  partitionFilters,
+  conf.sessionLocalTimeZone)
+val sizeInBytes = try {
+  val partitionsWithSize = prunedPartitions.map { part =>
+val rawDataSize = 
part.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong)
+val totalSize = 
part.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
+if (rawDataSize.isDefined && rawDataSize.get > 0) {
+  (part, rawDataSize.get)
+} else if (totalSize.isDefined && totalSize.get > 0L) {
+  (part, totalSize.get)
+} else {
+  (part, 0L)
+}
+  }
+  val sizeOfPartitions =
+if (partitionsWithSize.count(_._2==0) <= 
conf.fallBackToHdfsForStatsMaxPartitionNum) {
+  partitionsWithSize.map{ pair =>
+val (part, size) = (pair._1, pair._2)
+if (size == 0) {
+  CommandUtils.calculateLocationSize(
+session.sessionState, relation.tableMeta.identifier, 
part.storage.locationUri)
+} else {
+  size
+}
+  }.sum
+} else {
+  partitionsWithSize.filter(_._2>0).map(_._2).sum
+}
+  // If size of partitions is zero fall back to the default size.
+  if (sizeOfPartitions == 0L) conf.defaultSizeInBytes else sizeOfPartitions
+} catch {
+  case e: IOException =>
+logWarning("Failed to get table size from HDFS.", e)
+conf.defaultSizeInBytes
+}
+val withStats =
+  if (relation.tableMeta.stats.isDefined) {
+relation.tableMeta.copy(
+  stats = Some(relation.tableMeta.stats.get.copy(sizeInBytes = 
BigInt(sizeInBytes
+  } else {
+relation.tableMeta.copy(stats = Some(CatalogStatistics(sizeInBytes = 
BigInt(sizeInBytes
+  }
+relation.copy(tableMeta = withStats, prunedPartitions = 
Some(prunedPartitions))
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+  case op @ PhysicalOperation(projections, filters, relation: 
HiveTableRelation)
 
 Review comment:
   2 space indentation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions

2020-01-02 Thread GitBox
cloud-fan commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r362396387
 
 

 ##
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
 ##
 @@ -150,6 +151,100 @@ class DetermineTableStats(session: SparkSession) extends 
Rule[LogicalPlan] {
   }
 }
 
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+case class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with PredicateHelper {
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def extractPartitionPruningFilters(filters: Seq[Expression],
 
 Review comment:
   nit:
   ```
   def func(
   para1: T,
   para2: T...
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org