fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add 
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r363016643
 
 

 ##########
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
 ##########
 @@ -150,6 +151,100 @@ class DetermineTableStats(session: SparkSession) extends 
Rule[LogicalPlan] {
   }
 }
 
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make 
hive as a data source.
+ */
+case class PruneHiveTablePartitions(session: SparkSession)
+  extends Rule[LogicalPlan] with PredicateHelper {
+  /**
+   * Extract the partition filters from the filters on the table.
+   */
+  private def extractPartitionPruningFilters(filters: Seq[Expression],
+                                             relation: HiveTableRelation): 
Seq[Expression] = {
+    val normalizedFilters = filters.map { e =>
+      e transform {
+        case a: AttributeReference =>
+          a.withName(relation.output.find(_.semanticEquals(a)).get.name)
+      }
+    }
+    val partitionSet = AttributeSet(relation.partitionCols)
+    normalizedFilters.filter { predicate =>
+      !predicate.references.isEmpty && 
predicate.references.subsetOf(partitionSet)
+    }
+  }
+
+  /**
+   * Prune the hive table using filters on the partitions of the table,
+   * and also update the statistics of the table.
+   */
+  private def prunedHiveTableWithStats(relation: HiveTableRelation,
+                                       partitionFilters: Seq[Expression]): 
HiveTableRelation = {
+    val conf = session.sessionState.conf
+    val prunedPartitions = 
session.sharedState.externalCatalog.listPartitionsByFilter(
+      relation.tableMeta.database,
+      relation.tableMeta.identifier.table,
+      partitionFilters,
+      conf.sessionLocalTimeZone)
+    val sizeInBytes = try {
+      val partitionsWithSize = prunedPartitions.map { part =>
+        val rawDataSize = 
part.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong)
+        val totalSize = 
part.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
+        if (rawDataSize.isDefined && rawDataSize.get > 0) {
+          (part, rawDataSize.get)
+        } else if (totalSize.isDefined && totalSize.get > 0L) {
+          (part, totalSize.get)
+        } else {
+          (part, 0L)
+        }
+      }
+      val sizeOfPartitions =
 
 Review comment:
   DetermineTableStats is Analyzer rule, while the pruned partitions and the 
size of them must be calculated 
   after filter push-down optimizers executed. So we can not put this part in 
DetermineTableStats now.
   But I will check whether the DetermineTableStats can be moved to 
optimization phase and put after PruneHiveTablePartitions. If any 
idea/suggestion, please share. thanks.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to