This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 7d36329  [SPARK-38271] PoissonSampler may output more rows than MaxRows
7d36329 is described below

commit 7d363294b7af212836e7a444ad82c716f3560278
Author: Ruifeng Zheng <ruife...@foxmail.com>
AuthorDate: Tue Feb 22 21:04:43 2022 +0800

    [SPARK-38271] PoissonSampler may output more rows than MaxRows
    
    ### What changes were proposed in this pull request?
    when `replacement=true`, `Sample.maxRows` returns `None`
    
    ### Why are the changes needed?
    the underlying impl of `SampleExec` can not guarantee that its number of 
output rows <= `Sample.maxRows`
    
    ```
    scala> val df = spark.range(0, 1000)
    df: org.apache.spark.sql.Dataset[Long] = [id: bigint]
    
    scala> df.count
    res0: Long = 1000
    
    scala> df.sample(true, 0.999999, 10).count
    res1: Long = 1004
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    existing testsuites
    
    Closes #35593 from zhengruifeng/fix_sample_maxRows.
    
    Authored-by: Ruifeng Zheng <ruife...@foxmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit b68327968a7a5f7ac1afa9cc270204c9eaddcb75)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../sql/catalyst/plans/logical/basicLogicalOperators.scala  |  6 +++++-
 .../spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala | 13 +++++++++++++
 2 files changed, 18 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 7f33f28..6748db5 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -1344,7 +1344,11 @@ case class Sample(
       s"Sampling fraction ($fraction) must be on interval [0, 1] without 
replacement")
   }
 
-  override def maxRows: Option[Long] = child.maxRows
+  override def maxRows: Option[Long] = {
+    // when withReplacement is true, PoissonSampler is applied in SampleExec,
+    // which may output more rows than child.maxRows.
+    if (withReplacement) None else child.maxRows
+  }
   override def output: Seq[Attribute] = child.output
 
   override protected def withNewChildInternal(newChild: LogicalPlan): Sample =
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala
index 46e9dea..d3cbaa8 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala
@@ -159,6 +159,19 @@ class CombiningLimitsSuite extends PlanTest {
     )
   }
 
+  test("SPARK-38271: PoissonSampler may output more rows than child.maxRows") {
+    val query = testRelation.select().sample(0, 0.2, true, 1)
+    assert(query.maxRows.isEmpty)
+    val optimized = Optimize.execute(query.analyze)
+    assert(optimized.maxRows.isEmpty)
+    // can not eliminate Limit since Sample.maxRows is None
+    checkPlanAndMaxRow(
+      query.limit(10),
+      query.limit(10),
+      10
+    )
+  }
+
   test("SPARK-33497: Eliminate Limit if Deduplicate max rows not larger than 
Limit") {
     checkPlanAndMaxRow(
       testRelation.deduplicate("a".attr).limit(10),

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to