Github user fjh100456 commented on a diff in the pull request:
https://github.com/apache/spark/pull/22693#discussion_r230726170
--- Diff:
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---
@@ -115,26 +116,45 @@ class ResolveHiveSerdeTable(session: SparkSession)
extends Rule[LogicalPlan] {
class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan]
{
override def apply(plan: LogicalPlan): LogicalPlan = plan
resolveOperators {
+ case filterPlan @ Filter(_, SubqueryAlias(_, relation:
HiveTableRelation)) =>
+ val predicates =
PhysicalOperation.unapply(filterPlan).map(_._2).getOrElse(Nil)
+ computeTableStats(relation, predicates)
case relation: HiveTableRelation
if DDLUtils.isHiveTable(relation.tableMeta) &&
relation.tableMeta.stats.isEmpty =>
- val table = relation.tableMeta
- val sizeInBytes = if
(session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
- try {
- val hadoopConf = session.sessionState.newHadoopConf()
- val tablePath = new Path(table.location)
- val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
- fs.getContentSummary(tablePath).getLength
- } catch {
- case e: IOException =>
- logWarning("Failed to get table size from hdfs.", e)
- session.sessionState.conf.defaultSizeInBytes
- }
- } else {
- session.sessionState.conf.defaultSizeInBytes
+ computeTableStats(relation)
+ }
+
+ private def computeTableStats(
+ relation: HiveTableRelation,
+ predicates: Seq[Expression] = Nil): LogicalPlan = {
+ val table = relation.tableMeta
+ val sizeInBytes = if
(session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
+ try {
+ val hadoopConf = session.sessionState.newHadoopConf()
+ val tablePath = new Path(table.location)
+ val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
+ BigInt(fs.getContentSummary(tablePath).getLength)
+ } catch {
+ case e: IOException =>
+ logWarning("Failed to get table size from hdfs.", e)
+ getSizeInBytesFromTablePartitions(table.identifier, predicates)
}
+ } else {
+ getSizeInBytesFromTablePartitions(table.identifier, predicates)
+ }
+ val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes
= sizeInBytes)))
+ relation.copy(tableMeta = withStats)
+ }
- val withStats = table.copy(stats =
Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes))))
- relation.copy(tableMeta = withStats)
+ private def getSizeInBytesFromTablePartitions(
+ tableIdentifier: TableIdentifier,
+ predicates: Seq[Expression] = Nil): BigInt = {
+ session.sessionState.catalog.listPartitionsByFilter(tableIdentifier,
predicates) match {
--- End diff --
Your solution seems much better. @wangyum
Thank you. I'll close it.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]