Github user tbertelsen commented on a diff in the pull request:
https://github.com/apache/spark/pull/6454#discussion_r31272686
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -580,7 +580,18 @@ abstract class RDD[T: ClassTag](
* elements (a, b) where a is in `this` and b is in `other`.
*/
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope {
- new CartesianRDD(sc, this, other)
+ val numPartitions = {
+ val numExecutors = System.getProperty("spark.executor.instances")
+
+ if (numExecutors != null) {
+ numExecutors.toInt
+ } else {
+ sc.defaultMinPartitions
+ }
+ }
+ val coalesced = coalesce(numPartitions, shuffle = true)
+ val coalescedOther = other.coalesce(numPartitions, shuffle = true)
+ new CartesianRDD(sc, coalesced, coalescedOther)
--- End diff --
We want to coalesce to reduce the number of times a given partition is
fetched. Each partition is fetched once for every partition in the other RDD.
The Cartesian RDD has a number of partitions equal to the product of the number
of partitions in the two source RDDs. For each partition in the Cartesian RDD
we will do (asymptotically) one fetch.
RDDs typically have a number of partitions equal to default parallelism
(the number of cores). The Cartesian RDDs you have the same. If we don't
coalesce we will expect them to have a number of partition equal to the square
of default parallelism.
If we for example have 10 machines with 16 cores then default parallelism
will be 160. If we don't coalesce the Cartesian RDD will have 25600 partitions.
2560 partitions can be calculated without a remote fetch but the rest will
involve a remote fetch. This will result in a lot of redundant fetches if we
don't cache the partitions. To be exact each partition will be fetched 144
times. If we coalesce we can bring that down to 10 or 16.
I agree it can be difficult to predict the number of partitions for all
possible cases, but we can do a pretty good guess that would be great for most
applications. I think the best solution is to make that guess and that the user
override it if he or she has domain knowledge to change the default.
I will suggest something like:
```
val numPartitions1 = numExecutors
val numPartitions2 = max(numExecutors, defaultParallelism / numExecutors)
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]