Github user mridulm commented on a diff in the pull request:
https://github.com/apache/spark/pull/20091#discussion_r162778292
--- Diff:
core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala ---
@@ -332,6 +331,48 @@ class PairRDDFunctionsSuite extends SparkFunSuite with
SharedSparkContext {
assert(joined.getNumPartitions == rdd2.getNumPartitions)
}
+ test("cogroup between multiple RDD when defaultParallelism is set
without proper partitioner") {
+ assert(!sc.conf.contains("spark.default.parallelism"))
+ try {
+ sc.conf.set("spark.default.parallelism", "4")
+ val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 20)
+ val rdd2 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)), 10)
+ val joined = rdd1.cogroup(rdd2)
+ assert(joined.getNumPartitions == sc.defaultParallelism)
+ } finally {
+ sc.conf.remove("spark.default.parallelism")
+ }
+ }
+
+ test("cogroup between multiple RDD when defaultParallelism is set with
proper partitioner") {
+ assert(!sc.conf.contains("spark.default.parallelism"))
+ try {
+ sc.conf.set("spark.default.parallelism", "4")
+ val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 20)
+ val rdd2 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
+ .partitionBy(new HashPartitioner(10))
+ val joined = rdd1.cogroup(rdd2)
+ assert(joined.getNumPartitions == rdd2.getNumPartitions)
+ } finally {
+ sc.conf.remove("spark.default.parallelism")
+ }
+ }
+
+ test("cogroup between multiple RDD when defaultParallelism is set with
huge number of " +
--- End diff --
nit: "set; with huge number of partitions in upstream RDDs"
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]