Github user tbertelsen commented on the pull request:
https://github.com/apache/spark/pull/5572#issuecomment-94175140
Thanks for taking a crack at this. It helps a lot, but I see a few problems
:)
1. It requires that the user to know of and remembers to set the
configuration property.
2. The property will pollute the cache for all non-cartesian operations,
which I think will be very expensive.
Can't we find a solution the don't have these problems?
Idea 1
------
A simple no-brainer (once you see it) will be to change
`cartesianRDD.compute( )` from
```
val currSplit = split.asInstanceOf[CartesianPartition]
for (x <- rdd1.iterator(currSplit.s1, context);
y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
```
to
```
val currSplit = split.asInstanceOf[CartesianPartition]
val it2 = rdd2.iterator(currSplit.s2, context);
for (x <- rdd1.iterator(currSplit.s1, context);
y <- it2) yield (x, y)
```
The current implementation calls `rdd2.iterator` once for each value in
`currSplit.s1`. The above change will only call it once, and reduce the number
of fetches from `rdd2`, to be once per partition in `rdd1`, rather than once
per value in `rdd1`. That will be a huge improvement, but we might still fetch
the same partition of `rdd1` or `rdd2` multiple times on the same worker.
I will argue that this change is needed even if we have `rdd2` cache
locally, since we reduce the number of times we query the cache.
Idea 2
-------
Correct me if I am wrong, but it seems to me, that
- we always want to cache remote fetches when we do a cartesian product
- we never want to cache remote fetches for any other operation
So ideally we would be able to turn caching on/off depending on the type of
operation we perform. I have practically no knowledge of how the caching works
in Spark, but it seems like your proposed solution works to turn caching on or
off. But instead of letting it be determined by the configuration, it should be
determined by a parameter passed down from "somewhere".
That "somewhere" could be `cartesianRDD.compute( )` so it would be as
below. But again I am not a committer and have no knowledge of Spark's internal
design, so there might be a better place.
```
val currSplit = split.asInstanceOf[CartesianPartition]
val it2 = rdd2.iterator(currSplit.s2, context, cache = true);
for (x <- rdd1.iterator(currSplit.s1, context, cache = true);
y <- it2) yield (x, y)
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]