cloud-fan commented on code in PR #52153: URL: https://github.com/apache/spark/pull/52153#discussion_r2309677041
########## sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala: ########## @@ -1406,6 +1406,87 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { assert(planned.exists(_.isInstanceOf[GlobalLimitExec])) assert(planned.exists(_.isInstanceOf[LocalLimitExec])) } + + test("SPARK-53401: repartitionById should throw an exception for negative partition id") { + val df = spark.range(10).toDF("id") + val repartitioned = df.repartitionById(10, $"id" - 5) + + val e = intercept[SparkException] { + repartitioned.collect() + } + // The error is caught as ArrayIndexOutOfBoundsException wrapped in SparkException + assert(e.getMessage.contains("Index -5 out of bounds")) + } + + test("SPARK-53401: repartitionById should throw an exception for partition id >= numPartitions") { + val numPartitions = 10 + val df = spark.range(20).toDF("id") + val repartitioned = df.repartitionById(numPartitions, $"id") + + val e = intercept[SparkException] { + repartitioned.collect() + } + // ArrayIndexOutOfBoundsException for partition IDs >= numPartitions + assert(e.getMessage.contains("out of bounds")) + } + + /** + * A helper function to check the number of shuffle exchanges in a physical plan. + * + * @param df The DataFrame whose physical plan will be examined. + * @param expectedShuffles The expected number of shuffle exchanges. + */ + private def checkShuffleCount(df: DataFrame, expectedShuffles: Int): Unit = { + val plan = df.queryExecution.executedPlan + val shuffles = collect(plan) { + case s: ShuffleExchangeLike => s + } + assert( + shuffles.size == expectedShuffles, + s"Expected $expectedShuffles shuffle(s), but found ${shuffles.size} in the plan:\n$plan" + ) + } + + test("SPARK-53401: groupBy on a superset of partition keys should reuse the shuffle") { + val df = spark.range(100).select($"id" % 10 as "key1", $"id" as "value") + val grouped = df.repartitionById(10, $"key1").groupBy($"key1", lit(1)).count() + checkShuffleCount(grouped, 1) + } + + test("SPARK-53401: shuffle reuse is not affected by spark.sql.shuffle.partitions") { + withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "5") { + val df = spark.range(100).select($"id" % 10 as "key", $"id" as "value") + val grouped = df.repartitionById(10, $"key").groupBy($"key").count() + + checkShuffleCount(grouped, 1) + // The explicit repartition number should be respected, not the config value. + assert(grouped.rdd.getNumPartitions == 10) + } + } + + test("SPARK-53401: shuffle reuse with intervening narrow operations") { + val df = spark.range(100).select($"id" % 10 as "key", $"id" as "value") + val grouped = + df.repartitionById(10, $"key") + .filter($"value" > 50).groupBy($"key").count() + checkShuffleCount(grouped, 1) + } + + test("SPARK-53401: shuffle reuse after a join that preserves partitioning") { Review Comment: I think a more interesting test is to prove that a join with id pass-through and hash partitioning will still do a shuffle on the id pass-through side. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org