fuwhu commented on a change in pull request #26805: [SPARK-15616][SQL] Add
optimizer rule PruneHiveTablePartitions
URL: https://github.com/apache/spark/pull/26805#discussion_r363016643
##########
File path:
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
##########
@@ -150,6 +151,100 @@ class DetermineTableStats(session: SparkSession) extends
Rule[LogicalPlan] {
}
}
+/**
+ * TODO: merge this with PruneFileSourcePartitions after we completely make
hive as a data source.
+ */
+case class PruneHiveTablePartitions(session: SparkSession)
+ extends Rule[LogicalPlan] with PredicateHelper {
+ /**
+ * Extract the partition filters from the filters on the table.
+ */
+ private def extractPartitionPruningFilters(filters: Seq[Expression],
+ relation: HiveTableRelation):
Seq[Expression] = {
+ val normalizedFilters = filters.map { e =>
+ e transform {
+ case a: AttributeReference =>
+ a.withName(relation.output.find(_.semanticEquals(a)).get.name)
+ }
+ }
+ val partitionSet = AttributeSet(relation.partitionCols)
+ normalizedFilters.filter { predicate =>
+ !predicate.references.isEmpty &&
predicate.references.subsetOf(partitionSet)
+ }
+ }
+
+ /**
+ * Prune the hive table using filters on the partitions of the table,
+ * and also update the statistics of the table.
+ */
+ private def prunedHiveTableWithStats(relation: HiveTableRelation,
+ partitionFilters: Seq[Expression]):
HiveTableRelation = {
+ val conf = session.sessionState.conf
+ val prunedPartitions =
session.sharedState.externalCatalog.listPartitionsByFilter(
+ relation.tableMeta.database,
+ relation.tableMeta.identifier.table,
+ partitionFilters,
+ conf.sessionLocalTimeZone)
+ val sizeInBytes = try {
+ val partitionsWithSize = prunedPartitions.map { part =>
+ val rawDataSize =
part.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong)
+ val totalSize =
part.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
+ if (rawDataSize.isDefined && rawDataSize.get > 0) {
+ (part, rawDataSize.get)
+ } else if (totalSize.isDefined && totalSize.get > 0L) {
+ (part, totalSize.get)
+ } else {
+ (part, 0L)
+ }
+ }
+ val sizeOfPartitions =
Review comment:
DetermineTableStats is Analyzer rule, while the pruned partitions and the
size of them must be calculated
after filter push-down optimizers executed. So we can not put this part in
DetermineTableStats now.
But I will check whether the DetermineTableStats can be moved to Optimizer
and put after PruneHiveTablePartitions. If any idea/suggestion, please share.
thanks.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]