Github user ala commented on a diff in the pull request:
https://github.com/apache/spark/pull/20664#discussion_r170611946
--- Diff: core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala ---
@@ -1129,6 +1129,36 @@ class RDDSuite extends SparkFunSuite with
SharedSparkContext {
}.collect()
}
+ test("SPARK-23496: order of input partitions can result in severe skew
in coalesce") {
+ val numInputPartitions = 100
+ val numCoalescedPartitions = 50
+ val locations = Array("locA", "locB")
+
+ val inputRDD = sc.makeRDD(Range(0, numInputPartitions).toArray[Int],
numInputPartitions)
+ assert(inputRDD.getNumPartitions == numInputPartitions)
+
+ val locationPrefRDD = new LocationPrefRDD(inputRDD, { (p: Partition) =>
+ if (p.index < numCoalescedPartitions) {
+ Seq(locations(0))
+ } else {
+ Seq(locations(1))
+ }
+ })
+ val coalescedRDD = new CoalescedRDD(locationPrefRDD,
numCoalescedPartitions)
+
+ val numPartsPerLocation = coalescedRDD
+ .getPartitions
+ .map(coalescedRDD.getPreferredLocations(_).head)
+ .groupBy(identity)
+ .mapValues(_.size)
+
+ // Without the fix these would be:
+ // numPartsPerLocation(locations(0)) == numCoalescedPartitions - 1
+ // numPartsPerLocation(locations(1)) == 1
+ assert(numPartsPerLocation(locations(0)) > 0.4 *
numCoalescedPartitions)
--- End diff --
Added comment about flakiness & fixed seed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]