wangyum commented on pull request #33286:
URL: https://github.com/apache/spark/pull/33286#issuecomment-880268176


   After the partition is pruned, the number of rows will also change. But the 
statistics of the other columns have not changed. This is a way to avoid the 
null count higher than the row count:
   
   ```diff
   Index: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
   IDEA additional info:
   Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
   <+>UTF-8
   ===================================================================
   diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
   --- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
     (revision 7566db603305ef04db89768bbb50d8b978a5c93c)
   +++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
     (date 1626303508996)
   @@ -102,13 +102,17 @@
            // Change table stats based on the sizeInBytes of pruned files
            val filteredStats =
              FilterEstimation(Filter(partitionKeyFilters.reduce(And), 
logicalRelation)).estimate
   +        val rowCount = filteredStats.flatMap(_.rowCount)
            val colStats = filteredStats.map(_.attributeStats.map { case (attr, 
colStat) =>
   -          (attr.name, colStat.toCatalogColumnStat(attr.name, attr.dataType))
   +          val columnStat = colStat.toCatalogColumnStat(attr.name, 
attr.dataType)
   +          val distinctCount = columnStat.distinctCount.map(d => 
rowCount.map(_.min(d)).getOrElse(d))
   +          val nullCount = columnStat.nullCount.map(d => 
rowCount.map(_.min(d)).getOrElse(d))
   +          (attr.name, columnStat.copy(distinctCount = distinctCount, 
nullCount = nullCount))
            })
            val withStats = logicalRelation.catalogTable.map(_.copy(
              stats = Some(CatalogStatistics(
                sizeInBytes = BigInt(prunedFileIndex.sizeInBytes),
   -            rowCount = filteredStats.flatMap(_.rowCount),
   +            rowCount = rowCount,
                colStats = colStats.getOrElse(Map.empty)))))
            val prunedLogicalRelation = logicalRelation.copy(
              relation = prunedFsRelation, catalogTable = withStats)
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to