maropu commented on a change in pull request #30443:
URL: https://github.com/apache/spark/pull/30443#discussion_r546233178



##########
File path: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala
##########
@@ -117,4 +128,111 @@ class CombiningLimitsSuite extends PlanTest {
       testRelation.select().groupBy()(count(1)).orderBy(count(1).asc).analyze)
     comparePlans(optimized4, expected4)
   }
+
+  test("SPARK-33497: Eliminate Limit if LocalRelation max rows not larger than 
Limit") {
+    checkPlan(
+      testRelation.select().limit(10),
+      testRelation.select(),
+      10
+    )
+  }
+
+  test("SPARK-33497: Eliminate Limit if Range max rows not larger than Limit") 
{
+    checkPlan(
+      Range(0, 100, 1, None).select().limit(200),
+      Range(0, 100, 1, None).select(),
+      100
+    )
+    checkPlan(
+      Range(-1, Long.MaxValue, 1, None).select().limit(1),
+      Range(-1, Long.MaxValue, 1, None).select().limit(1),
+      1
+    )
+  }
+
+  test("SPARK-33497: Eliminate Limit if Sample max rows not larger than 
Limit") {
+    checkPlan(
+      testRelation.select().sample(upperBound = 0.2, seed = 1).limit(10),
+      testRelation.select().sample(upperBound = 0.2, seed = 1),
+      10
+    )
+  }
+
+  test("SPARK-33497: Eliminate Limit if Deduplicate max rows not larger than 
Limit") {
+    checkPlan(
+      testRelation.deduplicate(Symbol("a")).limit(10),
+      testRelation.deduplicate(Symbol("a")),
+      10
+    )
+  }
+
+  test("SPARK-33497: Eliminate Limit if Repartition max rows not larger than 
Limit") {
+    checkPlan(
+      testRelation.repartition(2).limit(10),
+      testRelation.repartition(2),
+      10
+    )
+    checkPlan(
+      testRelation.distribute(Symbol("a"))(2).limit(10),
+      testRelation.distribute(Symbol("a"))(2),
+      10
+    )
+  }
+
+  test("SPARK-33497: Eliminate Limit if Join max rows not larger than Limit") {
+    checkPlan(
+      testRelation.join(testRelation2, joinType = Inner).limit(20),
+      testRelation.join(testRelation2, joinType = Inner),
+      20
+    )
+    checkPlan(
+      testRelation.join(testRelation2, joinType = FullOuter).limit(10),
+      testRelation.join(testRelation2, joinType = FullOuter).limit(10),
+      10
+    )
+    checkPlan(
+      testRelation.join(testRelation2, joinType = LeftSemi).limit(5),
+      testRelation.join(testRelation2.select(), joinType = LeftSemi).limit(5),
+      5
+    )
+    checkPlan(
+      testRelation.join(testRelation2, joinType = LeftAnti).limit(10),
+      testRelation.join(testRelation2.select(), joinType = LeftAnti),
+      10
+    )
+    checkPlan(
+      testRelation.join(testRelation3, joinType = LeftOuter).limit(100),
+      testRelation.join(testRelation3, joinType = LeftOuter).limit(100),
+      100
+    )
+    checkPlan(
+      testRelation.join(testRelation4, joinType = RightOuter).limit(100),
+      testRelation.join(testRelation4, joinType = RightOuter).limit(100),
+      100
+    )
+  }
+
+  test("SPARK-33497: Eliminate Limit if Window max rows not larger than 
Limit") {
+    checkPlan(
+      testRelation.window(
+        Seq(count(1).as("c")), Seq(Symbol("a")), 
Seq(Symbol("b").asc)).limit(20),
+      testRelation.window(
+        Seq(count(1).as("c")), Seq(Symbol("a")), Seq(Symbol("b").asc)),
+      10
+    )
+  }
+
+  private def checkPlan(

Review comment:
       nit: `checkPlan` -> `checkPlanAndMaxRow`

##########
File path: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala
##########
@@ -117,4 +128,111 @@ class CombiningLimitsSuite extends PlanTest {
       testRelation.select().groupBy()(count(1)).orderBy(count(1).asc).analyze)
     comparePlans(optimized4, expected4)
   }
+
+  test("SPARK-33497: Eliminate Limit if LocalRelation max rows not larger than 
Limit") {
+    checkPlan(
+      testRelation.select().limit(10),
+      testRelation.select(),
+      10
+    )
+  }
+
+  test("SPARK-33497: Eliminate Limit if Range max rows not larger than Limit") 
{
+    checkPlan(
+      Range(0, 100, 1, None).select().limit(200),
+      Range(0, 100, 1, None).select(),
+      100
+    )
+    checkPlan(
+      Range(-1, Long.MaxValue, 1, None).select().limit(1),
+      Range(-1, Long.MaxValue, 1, None).select().limit(1),
+      1
+    )
+  }
+
+  test("SPARK-33497: Eliminate Limit if Sample max rows not larger than 
Limit") {
+    checkPlan(
+      testRelation.select().sample(upperBound = 0.2, seed = 1).limit(10),
+      testRelation.select().sample(upperBound = 0.2, seed = 1),
+      10
+    )
+  }
+
+  test("SPARK-33497: Eliminate Limit if Deduplicate max rows not larger than 
Limit") {
+    checkPlan(
+      testRelation.deduplicate(Symbol("a")).limit(10),
+      testRelation.deduplicate(Symbol("a")),
+      10
+    )
+  }
+
+  test("SPARK-33497: Eliminate Limit if Repartition max rows not larger than 
Limit") {
+    checkPlan(
+      testRelation.repartition(2).limit(10),
+      testRelation.repartition(2),
+      10
+    )
+    checkPlan(
+      testRelation.distribute(Symbol("a"))(2).limit(10),
+      testRelation.distribute(Symbol("a"))(2),
+      10
+    )
+  }
+
+  test("SPARK-33497: Eliminate Limit if Join max rows not larger than Limit") {

Review comment:
       It seems this test unit has similar tests, so could you make it more 
compact just like this?
   ```
   Seq(Inner, FullOuter, LeftOuter, ...).foreach { joinType =>
     checkPlan(...)
   }
   ```

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
##########
@@ -326,6 +327,25 @@ case class Join(
     hint: JoinHint)
   extends BinaryNode with PredicateHelper {
 
+  override def maxRows: Option[Long] = {
+    joinType match {
+      case Inner | Cross | FullOuter | LeftOuter | RightOuter
+        if left.maxRows.isDefined && right.maxRows.isDefined =>
+        val maxRows = BigInt(left.maxRows.get) * BigInt(right.maxRows.get)
+        if (maxRows.isValidLong) {
+          Some(maxRows.toLong)
+        } else {
+          None

Review comment:
       Could you add tests for this case in `test("SPARK-33497: Eliminate Limit 
if Join max rows not larger than Limit")`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to