ulysses-you commented on a change in pull request #33079:
URL: https://github.com/apache/spark/pull/33079#discussion_r658527265



##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala
##########
@@ -46,45 +49,42 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite {
       // Some bytes per partition are 0 and total size is less than the target 
size.
       // 1 coalesced partition is expected.
       val bytesByPartitionId = Array[Long](10, 0, 20, 0, 0)
-      val expectedPartitionSpecs = Seq(CoalescedPartitionSpec(0, 5))
-      checkEstimation(Array(bytesByPartitionId), expectedPartitionSpecs, 
targetSize)
+      val expectedPartitionSpecs = Seq(CoalescedPartitionSpec(0, 5, 30))
+      checkEstimation(Array(bytesByPartitionId), expectedPartitionSpecs :: 
Nil, targetSize)
     }
 
     {
       // 2 coalesced partitions are expected.
       val bytesByPartitionId = Array[Long](10, 0, 90, 20, 0)
-      val expectedPartitionSpecs = Seq(CoalescedPartitionSpec(0, 3), 
CoalescedPartitionSpec(3, 5))
-      checkEstimation(Array(bytesByPartitionId), expectedPartitionSpecs, 
targetSize)
+      val expectedPartitionSpecs =
+        Seq(CoalescedPartitionSpec(0, 3, 100), CoalescedPartitionSpec(3, 5, 
20))
+      checkEstimation(Array(bytesByPartitionId), expectedPartitionSpecs :: 
Nil, targetSize)
     }
 
     {
       // There are a few large shuffle partitions.
       val bytesByPartitionId = Array[Long](110, 10, 100, 110, 0)
       val expectedPartitionSpecs = Seq(
-        CoalescedPartitionSpec(0, 1),
-        CoalescedPartitionSpec(1, 2),
-        CoalescedPartitionSpec(2, 3),
-        CoalescedPartitionSpec(3, 4))
-      checkEstimation(Array(bytesByPartitionId), expectedPartitionSpecs, 
targetSize)
+        CoalescedPartitionSpec(0, 1, 110),
+        CoalescedPartitionSpec(1, 2, 10),
+        CoalescedPartitionSpec(2, 3, 100),
+        CoalescedPartitionSpec(3, 4, 110))
+      checkEstimation(Array(bytesByPartitionId), expectedPartitionSpecs :: 
Nil, targetSize)
     }
 
     {
       // All shuffle partitions are larger than the targeted size.
+      // return Nil if cannot coalesce
       val bytesByPartitionId = Array[Long](100, 110, 100, 110, 110)
-      val expectedPartitionSpecs = Seq(
-        CoalescedPartitionSpec(0, 1),
-        CoalescedPartitionSpec(1, 2),
-        CoalescedPartitionSpec(2, 3),
-        CoalescedPartitionSpec(3, 4),
-        CoalescedPartitionSpec(4, 5))
-      checkEstimation(Array(bytesByPartitionId), expectedPartitionSpecs, 
targetSize)
+      checkEstimation(Array(bytesByPartitionId), Nil, targetSize)

Review comment:
       the test is wrong since we return Nil if cannot coalesce

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala
##########
@@ -109,87 +109,86 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite {
       // 1 coalesced partition is expected.
       val bytesByPartitionId1 = Array[Long](0, 10, 0, 20, 0)
       val bytesByPartitionId2 = Array[Long](30, 0, 20, 0, 20)
-      val expectedPartitionSpecs = Seq(CoalescedPartitionSpec(0, 5))
+      val expectedPartitionSpecs1 = Seq(CoalescedPartitionSpec(0, 5, 30))
+      val expectedPartitionSpecs2 = Seq(CoalescedPartitionSpec(0, 5, 70))
       checkEstimation(
         Array(bytesByPartitionId1, bytesByPartitionId2),
-        expectedPartitionSpecs,
+        Seq(expectedPartitionSpecs1, expectedPartitionSpecs2),
         targetSize)
     }
 
     {
       // 2 coalesced partition are expected.
       val bytesByPartitionId1 = Array[Long](0, 10, 0, 20, 0)
       val bytesByPartitionId2 = Array[Long](30, 0, 70, 0, 30)
-      val expectedPartitionSpecs = Seq(
-        CoalescedPartitionSpec(0, 2),
-        CoalescedPartitionSpec(2, 4),
-        CoalescedPartitionSpec(4, 5))
+      val expectedPartitionSpecs1 = Seq(
+        CoalescedPartitionSpec(0, 2, 10),
+        CoalescedPartitionSpec(2, 4, 20),
+        CoalescedPartitionSpec(4, 5, 0))
+      val expectedPartitionSpecs2 = Seq(
+        CoalescedPartitionSpec(0, 2, 30),
+        CoalescedPartitionSpec(2, 4, 70),
+        CoalescedPartitionSpec(4, 5, 30))
       checkEstimation(
         Array(bytesByPartitionId1, bytesByPartitionId2),
-        expectedPartitionSpecs,
+        Seq(expectedPartitionSpecs1, expectedPartitionSpecs2),
         targetSize)
     }
 
     {
       // 4 coalesced partition are expected.
       val bytesByPartitionId1 = Array[Long](0, 99, 0, 20, 0)
       val bytesByPartitionId2 = Array[Long](30, 0, 70, 0, 30)
-      val expectedPartitionSpecs = Seq(
-        CoalescedPartitionSpec(0, 1),
-        CoalescedPartitionSpec(1, 2),
-        CoalescedPartitionSpec(2, 4),
-        CoalescedPartitionSpec(4, 5))
+      val expectedPartitionSpecs1 = Seq(
+        CoalescedPartitionSpec(0, 1, 0),
+        CoalescedPartitionSpec(1, 2, 99),
+        CoalescedPartitionSpec(2, 4, 20),
+        CoalescedPartitionSpec(4, 5, 0))
+      val expectedPartitionSpecs2 = Seq(
+        CoalescedPartitionSpec(0, 1, 30),
+        CoalescedPartitionSpec(1, 2, 0),
+        CoalescedPartitionSpec(2, 4, 70),
+        CoalescedPartitionSpec(4, 5, 30))
       checkEstimation(
         Array(bytesByPartitionId1, bytesByPartitionId2),
-        expectedPartitionSpecs,
+        Seq(expectedPartitionSpecs1, expectedPartitionSpecs2),
         targetSize)
     }
 
     {
       // 2 coalesced partition are needed.
       val bytesByPartitionId1 = Array[Long](0, 100, 0, 30, 0)
       val bytesByPartitionId2 = Array[Long](30, 0, 70, 0, 30)
-      val expectedPartitionSpecs = Seq(
-        CoalescedPartitionSpec(0, 1),
-        CoalescedPartitionSpec(1, 2),
-        CoalescedPartitionSpec(2, 4),
-        CoalescedPartitionSpec(4, 5))
+      val expectedPartitionSpecs1 = Seq(
+        CoalescedPartitionSpec(0, 1, 0),
+        CoalescedPartitionSpec(1, 2, 100),
+        CoalescedPartitionSpec(2, 4, 30),
+        CoalescedPartitionSpec(4, 5, 0))
+      val expectedPartitionSpecs2 = Seq(
+        CoalescedPartitionSpec(0, 1, 30),
+        CoalescedPartitionSpec(1, 2, 0),
+        CoalescedPartitionSpec(2, 4, 70),
+        CoalescedPartitionSpec(4, 5, 30))
       checkEstimation(
         Array(bytesByPartitionId1, bytesByPartitionId2),
-        expectedPartitionSpecs,
+        Seq(expectedPartitionSpecs1, expectedPartitionSpecs2),
         targetSize)
     }
 
     {
       // There are a few large shuffle partitions.
+      // return Nil if cannot coalesce
       val bytesByPartitionId1 = Array[Long](0, 100, 40, 30, 0)
       val bytesByPartitionId2 = Array[Long](30, 0, 60, 0, 110)
-      val expectedPartitionSpecs = Seq(
-        CoalescedPartitionSpec(0, 1),
-        CoalescedPartitionSpec(1, 2),
-        CoalescedPartitionSpec(2, 3),
-        CoalescedPartitionSpec(3, 4),
-        CoalescedPartitionSpec(4, 5))
-      checkEstimation(
-        Array(bytesByPartitionId1, bytesByPartitionId2),
-        expectedPartitionSpecs,
-        targetSize)
+      checkEstimation(Array(bytesByPartitionId1, bytesByPartitionId2), Nil, 
targetSize)

Review comment:
       ditto

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala
##########
@@ -36,7 +36,10 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite {
       Seq.fill(mapOutputStatistics.length)(None),
       targetSize,
       minNumPartitions)
-    assert(estimatedPartitionStartIndices.forall(_ === 
expectedPartitionStartIndices))
+    assert(estimatedPartitionStartIndices.length === 
expectedPartitionStartIndices.length)
+    estimatedPartitionStartIndices.zip(expectedPartitionStartIndices).foreach {
+      case (actual, expect) => assert(actual === expect)
+    }

Review comment:
       the previous assert has no effect if the result 
`estimatedPartitionStartIndices` is empty

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala
##########
@@ -109,87 +109,86 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite {
       // 1 coalesced partition is expected.
       val bytesByPartitionId1 = Array[Long](0, 10, 0, 20, 0)
       val bytesByPartitionId2 = Array[Long](30, 0, 20, 0, 20)
-      val expectedPartitionSpecs = Seq(CoalescedPartitionSpec(0, 5))
+      val expectedPartitionSpecs1 = Seq(CoalescedPartitionSpec(0, 5, 30))
+      val expectedPartitionSpecs2 = Seq(CoalescedPartitionSpec(0, 5, 70))
       checkEstimation(
         Array(bytesByPartitionId1, bytesByPartitionId2),
-        expectedPartitionSpecs,
+        Seq(expectedPartitionSpecs1, expectedPartitionSpecs2),
         targetSize)
     }
 
     {
       // 2 coalesced partition are expected.
       val bytesByPartitionId1 = Array[Long](0, 10, 0, 20, 0)
       val bytesByPartitionId2 = Array[Long](30, 0, 70, 0, 30)
-      val expectedPartitionSpecs = Seq(
-        CoalescedPartitionSpec(0, 2),
-        CoalescedPartitionSpec(2, 4),
-        CoalescedPartitionSpec(4, 5))
+      val expectedPartitionSpecs1 = Seq(
+        CoalescedPartitionSpec(0, 2, 10),
+        CoalescedPartitionSpec(2, 4, 20),
+        CoalescedPartitionSpec(4, 5, 0))
+      val expectedPartitionSpecs2 = Seq(
+        CoalescedPartitionSpec(0, 2, 30),
+        CoalescedPartitionSpec(2, 4, 70),
+        CoalescedPartitionSpec(4, 5, 30))
       checkEstimation(
         Array(bytesByPartitionId1, bytesByPartitionId2),
-        expectedPartitionSpecs,
+        Seq(expectedPartitionSpecs1, expectedPartitionSpecs2),
         targetSize)
     }
 
     {
       // 4 coalesced partition are expected.
       val bytesByPartitionId1 = Array[Long](0, 99, 0, 20, 0)
       val bytesByPartitionId2 = Array[Long](30, 0, 70, 0, 30)
-      val expectedPartitionSpecs = Seq(
-        CoalescedPartitionSpec(0, 1),
-        CoalescedPartitionSpec(1, 2),
-        CoalescedPartitionSpec(2, 4),
-        CoalescedPartitionSpec(4, 5))
+      val expectedPartitionSpecs1 = Seq(
+        CoalescedPartitionSpec(0, 1, 0),
+        CoalescedPartitionSpec(1, 2, 99),
+        CoalescedPartitionSpec(2, 4, 20),
+        CoalescedPartitionSpec(4, 5, 0))
+      val expectedPartitionSpecs2 = Seq(
+        CoalescedPartitionSpec(0, 1, 30),
+        CoalescedPartitionSpec(1, 2, 0),
+        CoalescedPartitionSpec(2, 4, 70),
+        CoalescedPartitionSpec(4, 5, 30))
       checkEstimation(
         Array(bytesByPartitionId1, bytesByPartitionId2),
-        expectedPartitionSpecs,
+        Seq(expectedPartitionSpecs1, expectedPartitionSpecs2),
         targetSize)
     }
 
     {
       // 2 coalesced partition are needed.
       val bytesByPartitionId1 = Array[Long](0, 100, 0, 30, 0)
       val bytesByPartitionId2 = Array[Long](30, 0, 70, 0, 30)
-      val expectedPartitionSpecs = Seq(
-        CoalescedPartitionSpec(0, 1),
-        CoalescedPartitionSpec(1, 2),
-        CoalescedPartitionSpec(2, 4),
-        CoalescedPartitionSpec(4, 5))
+      val expectedPartitionSpecs1 = Seq(
+        CoalescedPartitionSpec(0, 1, 0),
+        CoalescedPartitionSpec(1, 2, 100),
+        CoalescedPartitionSpec(2, 4, 30),
+        CoalescedPartitionSpec(4, 5, 0))
+      val expectedPartitionSpecs2 = Seq(
+        CoalescedPartitionSpec(0, 1, 30),
+        CoalescedPartitionSpec(1, 2, 0),
+        CoalescedPartitionSpec(2, 4, 70),
+        CoalescedPartitionSpec(4, 5, 30))
       checkEstimation(
         Array(bytesByPartitionId1, bytesByPartitionId2),
-        expectedPartitionSpecs,
+        Seq(expectedPartitionSpecs1, expectedPartitionSpecs2),
         targetSize)
     }
 
     {
       // There are a few large shuffle partitions.
+      // return Nil if cannot coalesce
       val bytesByPartitionId1 = Array[Long](0, 100, 40, 30, 0)
       val bytesByPartitionId2 = Array[Long](30, 0, 60, 0, 110)
-      val expectedPartitionSpecs = Seq(
-        CoalescedPartitionSpec(0, 1),
-        CoalescedPartitionSpec(1, 2),
-        CoalescedPartitionSpec(2, 3),
-        CoalescedPartitionSpec(3, 4),
-        CoalescedPartitionSpec(4, 5))
-      checkEstimation(
-        Array(bytesByPartitionId1, bytesByPartitionId2),
-        expectedPartitionSpecs,
-        targetSize)
+      checkEstimation(Array(bytesByPartitionId1, bytesByPartitionId2), Nil, 
targetSize)
     }
 
     {
       // All pairs of shuffle partitions are larger than the targeted size.
+      // return Nil if cannot coalesce
       val bytesByPartitionId1 = Array[Long](100, 100, 40, 30, 0)
       val bytesByPartitionId2 = Array[Long](30, 0, 60, 70, 110)
-      val expectedPartitionSpecs = Seq(
-        CoalescedPartitionSpec(0, 1),
-        CoalescedPartitionSpec(1, 2),
-        CoalescedPartitionSpec(2, 3),
-        CoalescedPartitionSpec(3, 4),
-        CoalescedPartitionSpec(4, 5))
-      checkEstimation(
-        Array(bytesByPartitionId1, bytesByPartitionId2),
-        expectedPartitionSpecs,
-        targetSize)
+      checkEstimation(Array(bytesByPartitionId1, bytesByPartitionId2), Nil, 
targetSize)

Review comment:
       ditto

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala
##########
@@ -256,39 +264,39 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite {
     {
       // 1 shuffle: All bytes per partition are 0, 1 empty partition spec 
created.
       val bytesByPartitionId = Array[Long](0, 0, 0, 0, 0)
-      val expectedPartitionSpecs = Seq(CoalescedPartitionSpec(0, 5))
-      checkEstimation(Array(bytesByPartitionId), expectedPartitionSpecs, 
targetSize)
+      val expectedPartitionSpecs = Seq(CoalescedPartitionSpec(0, 5, 0))
+      checkEstimation(Array(bytesByPartitionId), expectedPartitionSpecs :: 
Nil, targetSize)
     }
 
     {
       // 2 shuffles: All bytes per partition are 0, 1 empty partition spec 
created.
       val bytesByPartitionId1 = Array[Long](0, 0, 0, 0, 0)
       val bytesByPartitionId2 = Array[Long](0, 0, 0, 0, 0)
-      val expectedPartitionSpecs = Seq(CoalescedPartitionSpec(0, 5))
+      val expectedPartitionSpecs1 = Seq(CoalescedPartitionSpec(0, 5, 0))
+      val expectedPartitionSpecs2 = Seq(CoalescedPartitionSpec(0, 5, 0))
       checkEstimation(
-        Array(bytesByPartitionId1, bytesByPartitionId2), 
expectedPartitionSpecs, targetSize)
+        Array(bytesByPartitionId1, bytesByPartitionId2),
+        Seq(expectedPartitionSpecs1, expectedPartitionSpecs2),
+        targetSize)
     }
 
     {
       // No partition spec created for the 0-size partitions.
       val bytesByPartitionId1 = Array[Long](200, 0, 0, 0, 0)
       val bytesByPartitionId2 = Array[Long](100, 0, 300, 0, 0)
-      val expectedPartitionSpecs = Seq(
-        CoalescedPartitionSpec(0, 1),
-        CoalescedPartitionSpec(2, 3))
+      val expectedPartitionSpecs1 = Seq(
+        CoalescedPartitionSpec(0, 1, 200),
+        CoalescedPartitionSpec(2, 3, 0))
+      val expectedPartitionSpecs2 = Seq(
+        CoalescedPartitionSpec(0, 1, 100),
+        CoalescedPartitionSpec(2, 3, 300))
       checkEstimation(
         Array(bytesByPartitionId1, bytesByPartitionId2),
-        expectedPartitionSpecs,
+        Seq(expectedPartitionSpecs1, expectedPartitionSpecs2),
         targetSize, minNumPartitions)
     }
   }
 
-  test("return Nil if cannot coalesce") {

Review comment:
       we don't need this test which is covered by previous test




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to