cloud-fan commented on code in PR #52153:
URL: https://github.com/apache/spark/pull/52153#discussion_r2309677041


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala:
##########
@@ -1406,6 +1406,87 @@ class PlannerSuite extends SharedSparkSession with 
AdaptiveSparkPlanHelper {
     assert(planned.exists(_.isInstanceOf[GlobalLimitExec]))
     assert(planned.exists(_.isInstanceOf[LocalLimitExec]))
   }
+
+  test("SPARK-53401: repartitionById should throw an exception for negative 
partition id") {
+    val df = spark.range(10).toDF("id")
+    val repartitioned = df.repartitionById(10, $"id" - 5)
+
+    val e = intercept[SparkException] {
+      repartitioned.collect()
+    }
+    // The error is caught as ArrayIndexOutOfBoundsException wrapped in 
SparkException
+    assert(e.getMessage.contains("Index -5 out of bounds"))
+  }
+
+  test("SPARK-53401: repartitionById should throw an exception for partition 
id >= numPartitions") {
+    val numPartitions = 10
+    val df = spark.range(20).toDF("id")
+    val repartitioned = df.repartitionById(numPartitions, $"id")
+
+    val e = intercept[SparkException] {
+      repartitioned.collect()
+    }
+    // ArrayIndexOutOfBoundsException for partition IDs >= numPartitions
+    assert(e.getMessage.contains("out of bounds"))
+  }
+
+  /**
+   * A helper function to check the number of shuffle exchanges in a physical 
plan.
+   *
+   * @param df The DataFrame whose physical plan will be examined.
+   * @param expectedShuffles The expected number of shuffle exchanges.
+   */
+  private def checkShuffleCount(df: DataFrame, expectedShuffles: Int): Unit = {
+    val plan = df.queryExecution.executedPlan
+    val shuffles = collect(plan) {
+      case s: ShuffleExchangeLike => s
+    }
+    assert(
+      shuffles.size == expectedShuffles,
+      s"Expected $expectedShuffles shuffle(s), but found ${shuffles.size} in 
the plan:\n$plan"
+    )
+  }
+
+  test("SPARK-53401: groupBy on a superset of partition keys should reuse the 
shuffle") {
+    val df = spark.range(100).select($"id" % 10 as "key1", $"id" as "value")
+    val grouped = df.repartitionById(10, $"key1").groupBy($"key1", 
lit(1)).count()
+    checkShuffleCount(grouped, 1)
+  }
+
+  test("SPARK-53401: shuffle reuse is not affected by 
spark.sql.shuffle.partitions") {
+    withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "5") {
+      val df = spark.range(100).select($"id" % 10 as "key", $"id" as "value")
+      val grouped = df.repartitionById(10, $"key").groupBy($"key").count()
+
+      checkShuffleCount(grouped, 1)
+      // The explicit repartition number should be respected, not the config 
value.
+      assert(grouped.rdd.getNumPartitions == 10)
+    }
+  }
+
+  test("SPARK-53401: shuffle reuse with intervening narrow operations") {
+    val df = spark.range(100).select($"id" % 10 as "key", $"id" as "value")
+    val grouped =
+      df.repartitionById(10, $"key")
+        .filter($"value" > 50).groupBy($"key").count()
+    checkShuffleCount(grouped, 1)
+  }
+
+  test("SPARK-53401: shuffle reuse after a join that preserves partitioning") {

Review Comment:
   I think a more interesting test is to prove that a join with id pass-through 
and hash partitioning will still do a shuffle on the id pass-through side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to