wilmerdooley commented on code in PR #56621:
URL: https://github.com/apache/spark/pull/56621#discussion_r3482718955


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala:
##########
@@ -55,7 +67,8 @@ object SizeInBytesOnlyStatsPlanVisitor extends 
LogicalPlanVisitor[Statistics] {
   override def default(p: LogicalPlan): Statistics = p match {
     case p: LeafNode => p.computeStats()
     case _: LogicalPlan =>
-      Statistics(sizeInBytes = p.children.map(_.stats.sizeInBytes).filter(_ > 
0L).product)
+      val sizeInBytes = p.children.map(_.stats.sizeInBytes).filter(_ > 
0L).product
+      cappedStats(sizeInBytes)

Review Comment:
   Done. In `default()` I now bound each child size with 
`.min(EstimationUtils.MAX_SIZE_IN_BYTES)` before the `.product`, so an uncapped 
leaf size from `computeStats()` can no longer blow up the running product 
before the cap is applied. I also lifted the cap constant into 
`EstimationUtils` so both visitors share one definition.



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala:
##########
@@ -193,6 +193,30 @@ test("range with invalid long value") {
   checkStats(range, rangeStats, rangeStats)
 }
 
+  test("SPARK-52163: cap estimated size in bytes to avoid BigInt overflow") {
+    val hugeAttr = attr("id")
+    // A leaf node whose estimated size is already at the maximum 
representable physical size.
+    val hugePlan = StatsTestPlan(
+      outputList = Seq(hugeAttr),
+      attributeStats = AttributeMap(Seq(hugeAttr -> colStat)),
+      rowCount = BigInt(Long.MaxValue),
+      size = Some(BigInt(Long.MaxValue)))
+
+    // A cartesian join multiplies children's sizes, so without a cap the size 
would explode and
+    // eventually overflow the backing BigInt when compounded across repeated 
self-joins. The
+    // estimate must stay bounded and the cartesian product itself must be 
capped.
+    withSQLConf(SQLConf.CBO_ENABLED.key -> "false") {

Review Comment:
   Mirrored on the cost-based path. `BasicStatsPlanVisitor` now caps composite 
estimates (both size and row count) in a `visit()` override, so the join and 
aggregate estimators are bounded too, not only `default()`. Leaf nodes pass 
through uncapped to preserve real source sizes. I added a CBO-on case here that 
runs the self-join loop with `spark.sql.cbo.enabled=true`.



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala:
##########
@@ -193,6 +193,30 @@ test("range with invalid long value") {
   checkStats(range, rangeStats, rangeStats)
 }
 
+  test("SPARK-52163: cap estimated size in bytes to avoid BigInt overflow") {
+    val hugeAttr = attr("id")
+    // A leaf node whose estimated size is already at the maximum 
representable physical size.
+    val hugePlan = StatsTestPlan(

Review Comment:
   Added. `StatisticsCollectionSuite` now has an end-to-end test mirroring the 
repro: an empty DataFrame self-joined in a loop with a checkpoint each 
iteration, then `explain("cost")`. It checks that the estimated size stays 
bounded and the cost explain no longer overflows.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to